diff --git a/CHANGELOG.md b/CHANGELOG.md index 8fb10c5..4d908ea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,12 @@ # Unreleased +## [0.3.0] - 2024-07-05 + +- add new middleware option to disable auto-save state +- require storage adapter `update` function to update an existing entity. The function takes same arguments as `upsert` but will/should throw if the entity key was not found +- add default BPMN Engine service functions: `saveState`, `disableSaveState`, `enableSaveState`. The functions takes no arguments, at the moment + ## [0.2.0] - 2024-06-20 - add scripts endpoint that serves BPMN deployment javascripts module diff --git a/README.md b/README.md index 3615cf8..a237c84 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,8 @@ Express middleware for [BPMN engine](https://npmjs.com/package/bpmn-engine). Under construction so breaking changes will occur until v1. +- [Api documentation](./docs/API.md) + ## Usage ```javascript @@ -56,255 +58,3 @@ function errorHandler(err, req, res, next) { res.status(502).send({ message: err.message }); } ``` - -## Middleware - -### `bpmnEngineMiddleware([options])` - -Create BPMN engine middleware. - -Options: - -- `adapter`: Optional [storage adapter](#storage-adapter). Defaults to in-memory adapter based on LRU cache -- `engineOptions`: Optional BPMN Engine [options](https://github.com/paed01/bpmn-engine/blob/master/docs/API.md) -- `engineCache`: Optional engine LRU in-memory cache, defaults to `new LRUCache({ max: 1000 })` -- `broker`: Optional [smqp](https://npmjs.com/package/smqp) broker, used for forwarding events from executing engines -- `idleTimeout`: Optional positive integer, engine execution timeout in milliseconds before engine execution is considered idle and is stopped, defaults to 120000ms - -Returns Expressjs Router with extra properties: - -- `middleware`: middleware route functions -- `engines`: BPMN engines handler - -## Routes - -- [`GET (*)?/version`](#get-version) -- [`GET (*)?/deployment`](#get-deployment) -- [`POST (*)?/deployment/create`](#post-deploymentcreate) -- [`POST (*)?/process-definition/:deploymentName/start`](#post-process-definitiondeploymentnamestart) -- [`GET (*)?/script/:deploymentName`](#get-scriptdeploymentname) -- [`GET (*)?/running`](#get-running) -- [`GET (*)?/status/:token`](#get-statustoken) -- [`GET (*)?/status/:token/:activityId`](#get-statustokenactivityid) -- [`POST (*)?/resume/:token`](#post-resumetoken) -- [`POST (*)?/signal/:token`](#post-signaltoken) -- [`POST (*)?/cancel/:token`](#post-canceltoken) -- [`POST (*)?/fail/:token`](#post-failtoken) -- [`GET (*)?/state/:token`](#get-statetoken) -- [`DELETE (*)?/state/:token`](#delete-statetoken) -- [`DELETE (*)?/internal/stop`](#delete-internalstop) -- [`DELETE (*)?/internal/stop/:token`](#delete-internalstoptoken) - -### `GET (*)?/version` - -Get app version. - -Response body: - -- `version`: string, resolved from `process.cwd() + '/package.json` - -### `GET (*)?/deployment` - -Get app name. - -Response body: - -- `name`: string, resolved from `process.cwd() + '/package.json` - -### `POST (*)?/deployment/create` - -Create deployment by passing multipart form with BPMN diagram file. - -Content-type: `multipart/form-data` - -Form fields: - -- `deployment-name`: string, deployment name; -- `deployment-source`: string, deployment source; - -Response body: - -- `id`: string, same as deployment name -- `deploymentTime`: date, now -- `deployedProcessDefinitions`: object - - `[deploymentName]`: object, key as deployment name - - `id`: string, same as deployment name - -### `POST (*)?/process-definition/:deploymentName/start` - -Start deployment. - -Params: - -- `deploymentName`: deployment name - -Request body: - -- `businessKey`: string, business key -- `variables`: optional object with variables to pass to engine - -Response body: - -- `id`: string, unique execution token - -### `GET (*)?/script/:deploymentName` - -Get all declared scripts for deployment - -Response: - -- `content-type: text/javascript` -- `body`: module script, exported javascript functions where function name non-word characters are replaced with `_` - -### `GET (*)?/running` - -Get all running instances. - -Response body: - -- `engines`: list of executing engines - - `token`: string, unique execution token - - `name`: string, deployment name - - `state`: string, engine status, `idle`, `running`, `stopped`, or `error` - - `activityStatus`: string, running activity status, `idle`, `executing`, `timer`, or `wait` - -### `GET (*)?/status/:token` - -Get process status - -### `GET (*)?/status/:token/:activityId` - -Get process activity status - -### `POST (*)?/resume/:token` - -Resume process run - -### `POST (*)?/signal/:token` - -Signal process activity. - -Request body: - -- `id`: activity id -- `executionId`: optional activity execution id -- `message`: optional message to signal activity with - -### `POST (*)?/cancel/:token` - -Cancel process activity. - -Request body: - -- `id`: activity id -- `executionId`: optional activity execution id - -### `POST (*)?/fail/:token` - -Fail process activity. - -Request body: - -- `id`: activity id -- `executionId`: optional activity execution id -- `message`: optional message to send to activity - -### `GET (*)?/state/:token` - -Get process engine state. - -### `DELETE (*)?/state/:token` - -Delete process engine state. - -### `DELETE (*)?/internal/stop` - -Stop all running instances on this specific app instance. - -### `DELETE (*)?/internal/stop/:token` - -Stop running instances by token on this specific app instance. - -## Events - -BPMN Engine will forward BPMN engine events to app prefixed by `bpmn/`. - -### Event `bpmn/end` - -BPMN Engine has completed successfully. - -Handler arguments: - -- `engine`: Engine instance - -### Event `bpmn/stop` - -BPMN Engine execution has stopped. - -Handler arguments: - -- `engine`: Engine instance - -### Event `bpmn/error` - -BPMN Engine execution has failed. - -Handler arguments: - -- `err`: Error -- `engine`: Engine instance - -## Storage adapter - -Persistent storage adapter, defaults to in memory storage. - -Three types will be saved to adapter: - -- `deployment`: BPMN deployment with references to BPMN files -- `file`: BPMN file with meta and content -- `state`: BPMN engine state - -### `async upsert(type, key, value[, options])` - -Set entry with key. - -- `type`: string, storage type, `deployment`, `file`, or `state` -- `key`: string, storage key -- `value`: object, value -- `options`: optional object with options - -### `async delete(type, key)` - -Delete entry by key. - -- `type`: string, storage type, `deployment`, `file`, or `state` -- `key`: string, storage key - -### `async fetch(type, key[, options])` - -Fetch entry by key. - -- `type`: string, storage type, `deployment`, `file`, or `state` -- `key`: string, storage key -- `options`: optional object with options - - `exclude`: optional list of fields to exclude - -### `async query(type, qs[, options])` - -Query entries. - -- `type`: string, storage type, `deployment`, `file`, or `state` -- `qs`: object, storage query - - `exclude`: optional list of fields to exclude - - `state`: optional string, get engine states by state of engine, `idle`, `running`, etc - - `caller`: optional object, get engines by call activity caller - - `token`: string, calling process token - - `deployment`: string, calling process deployment name - - `id`: string, calling activity id - - `type`: string, calling activity type - - `executionId`: string, calling activity execution id -- `options`: optional object with options - -Returns: - -- `records`: List of entries diff --git a/docs/API.md b/docs/API.md index 0e35ebb..48c5fb1 100644 --- a/docs/API.md +++ b/docs/API.md @@ -1,3 +1,300 @@ -# API +## API -## `new BpmnEngineMiddleware(options, engines)` +### `bpmnEngineMiddleware([options])` + +Create BPMN engine middleware. + +Options: + +- `adapter`: Optional [storage adapter](#storage-adapter). Defaults to in-memory adapter based on LRU cache +- `engineOptions`: Optional BPMN Engine [options](https://github.com/paed01/bpmn-engine/blob/master/docs/API.md) +- `engineCache`: Optional engine LRU in-memory cache, defaults to `new LRUCache({ max: 1000 })` +- `broker`: Optional [smqp](https://npmjs.com/package/smqp) broker, used for forwarding events from executing engines +- `idleTimeout`: Optional positive integer, engine execution timeout in milliseconds before engine execution is considered idle and is stopped, defaults to 120000ms +- `autosaveEngineState`: Optional boolean, auto-save engine state during execution, defaults to true + +Returns Expressjs Router with extra properties: + +- `middleware`: middleware route functions +- `engines`: BPMN engines handler + +## Routes + +- [`GET (*)?/version`](#get-version) +- [`GET (*)?/deployment`](#get-deployment) +- [`POST (*)?/deployment/create`](#post-deploymentcreate) +- [`POST (*)?/process-definition/:deploymentName/start`](#post-process-definitiondeploymentnamestart) +- [`GET (*)?/script/:deploymentName`](#get-scriptdeploymentname) +- [`GET (*)?/running`](#get-running) +- [`GET (*)?/status/:token`](#get-statustoken) +- [`GET (*)?/status/:token/:activityId`](#get-statustokenactivityid) +- [`POST (*)?/resume/:token`](#post-resumetoken) +- [`POST (*)?/signal/:token`](#post-signaltoken) +- [`POST (*)?/cancel/:token`](#post-canceltoken) +- [`POST (*)?/fail/:token`](#post-failtoken) +- [`GET (*)?/state/:token`](#get-statetoken) +- [`DELETE (*)?/state/:token`](#delete-statetoken) +- [`DELETE (*)?/internal/stop`](#delete-internalstop) +- [`DELETE (*)?/internal/stop/:token`](#delete-internalstoptoken) + +### `GET (*)?/version` + +Get app version. + +Response body: + +- `version`: string, resolved from `process.cwd() + '/package.json` + +### `GET (*)?/deployment` + +Get app name. + +Response body: + +- `name`: string, resolved from `process.cwd() + '/package.json` + +### `POST (*)?/deployment/create` + +Create deployment by passing multipart form with BPMN diagram file. + +Content-type: `multipart/form-data` + +Form fields: + +- `deployment-name`: string, deployment name; +- `deployment-source`: string, deployment source; + +Response body: + +- `id`: string, same as deployment name +- `deploymentTime`: date, now +- `deployedProcessDefinitions`: object + - `[deploymentName]`: object, key as deployment name + - `id`: string, same as deployment name + +### `POST (*)?/process-definition/:deploymentName/start` + +Start deployment. + +Params: + +- `deploymentName`: deployment name + +Request body: + +- `businessKey`: string, business key +- `variables`: optional object with variables to pass to engine + +Response body: + +- `id`: string, unique execution token + +### `GET (*)?/script/:deploymentName` + +Get all declared scripts for deployment + +Response: + +- `content-type: text/javascript` +- `body`: module script, exported javascript functions where function name non-word characters are replaced with `_` + +### `GET (*)?/running` + +Get all running instances. + +Response body: + +- `engines`: list of executing engines + - `token`: string, unique execution token + - `name`: string, deployment name + - `state`: string, engine status, `idle`, `running`, `stopped`, or `error` + - `activityStatus`: string, running activity status, `idle`, `executing`, `timer`, or `wait` + +### `GET (*)?/status/:token` + +Get process status + +### `GET (*)?/status/:token/:activityId` + +Get process activity status + +### `POST (*)?/resume/:token` + +Resume process run + +### `POST (*)?/signal/:token` + +Signal process activity. + +Request body: + +- `id`: activity id +- `executionId`: optional activity execution id +- `message`: optional message to signal activity with + +### `POST (*)?/cancel/:token` + +Cancel process activity. + +Request body: + +- `id`: activity id +- `executionId`: optional activity execution id + +### `POST (*)?/fail/:token` + +Fail process activity. + +Request body: + +- `id`: activity id +- `executionId`: optional activity execution id +- `message`: optional message to send to activity + +### `GET (*)?/state/:token` + +Get process engine state. + +### `DELETE (*)?/state/:token` + +Delete process engine state. + +### `DELETE (*)?/internal/stop` + +Stop all running instances on this specific app instance. + +### `DELETE (*)?/internal/stop/:token` + +Stop running instances by token on this specific app instance. + +## Events + +BPMN Engine will forward BPMN engine events to app prefixed by `bpmn/`. + +### Event `bpmn/end` + +BPMN Engine has completed successfully. + +Handler arguments: + +- `engine`: Engine instance + +### Event `bpmn/stop` + +BPMN Engine execution has stopped. + +Handler arguments: + +- `engine`: Engine instance + +### Event `bpmn/error` + +BPMN Engine execution has failed. + +Handler arguments: + +- `err`: Error +- `engine`: Engine instance + +## Storage adapter + +Persistent storage adapter, defaults to in memory storage. + +Three types will be saved to adapter: + +- `deployment`: BPMN deployment with references to BPMN files +- `file`: BPMN file with meta and content +- `state`: BPMN engine state + +### `async upsert(type, key, value[, options])` + +Upsert entry with key. + +- `type`: string, storage type, `deployment`, `file`, or `state` +- `key`: string, storage key +- `value`: object, value +- `options`: optional object with options + +### `async update(type, key, value[, options])` + +Update entry with key. + +- `type`: string, storage type, `deployment`, `file`, or `state` +- `key`: string, storage key +- `value`: object, value +- `options`: optional object with options + +If the key was not found throw an [error with code](#storage-key-not-found) `ERR_BPMN_MIDDLEWARE_STORAGE_KEY_NOT_FOUND` to facilitate saving state. The error code should be among the exported constants of this project. + +### `async fetch(type, key[, options])` + +Fetch entry by key. + +- `type`: string, storage type, `deployment`, `file`, or `state` +- `key`: string, storage key +- `options`: optional object with options + - `exclude`: optional list of fields to exclude + +### `async delete(type, key)` + +Delete entry by key. + +- `type`: string, storage type, `deployment`, `file`, or `state` +- `key`: string, storage key + +### `async query(type, qs[, options])` + +Query entries. + +- `type`: string, storage type, `deployment`, `file`, or `state` +- `qs`: object, storage query + - `exclude`: optional list of fields to exclude + - `state`: optional string, get engine states by state of engine, `idle`, `running`, etc + - `caller`: optional object, get engines by call activity caller + - `token`: string, calling process token + - `deployment`: string, calling process deployment name + - `id`: string, calling activity id + - `type`: string, calling activity type + - `executionId`: string, calling activity execution id +- `options`: optional object with options + +Returns: + +- `records`: List of entries + +### Storage adapter examples + +#### Storage key not found + +```javascript +import assert from 'node:assert'; +import { LRUCache } from 'lru-cache'; + +import { StorageError, STORAGE_TYPE_STATE, ERR_STORAGE_KEY_NOT_FOUND } from 'bpmn-middleware'; + +class MyStorageAdapter { + constructor() { + this._data = new LRUCache({ max: 1000 }); + } + async upsert(type, key, value, options) {} + async update(type, key, value, options) { + if (!this._data.has(`${type}:${key}`)) throw new StorageError(`${type}:key not found`, ERR_STORAGE_KEY_NOT_FOUND); + return this.upsert(type, key, value, options); + } + async fetch(type, key, value, options) {} + async delete(type, key) {} + async query(type, qs, options) {} +} + +(async () => { + const adapter = new MyStorageAdapter(); + + try { + await adapter.update(STORAGE_TYPE_STATE, 'madeuptoken', {}); + } catch (err) { + var error = err; + } + + assert.equal(error?.code, ERR_STORAGE_KEY_NOT_FOUND); +})(); +``` diff --git a/eslint.config.js b/eslint.config.js index 21f2461..e9ee656 100644 --- a/eslint.config.js +++ b/eslint.config.js @@ -85,6 +85,7 @@ export default [ ...globals.mocha, expect: 'readonly', beforeEachScenario: 'readonly', + afterEachScenario: 'readonly', Buffer: 'readonly', Feature: 'readonly', Scenario: 'readonly', diff --git a/package.json b/package.json index 1a47197..9c9b429 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "bpmn-middleware", - "version": "0.2.0", + "version": "0.3.0", "description": "BPMN engine express middleware", "type": "module", "main": "./dist/main.cjs", @@ -15,7 +15,7 @@ }, "scripts": { "test": "mocha", - "posttest": "npm run lint && npm run dist", + "posttest": "npm run lint && npm run dist && texample ./README.md,./docs/API.md", "lint": "eslint . --cache && prettier . --check --cache", "prepack": "npm run dist", "cov:html": "c8 -r html -r text mocha", @@ -47,12 +47,13 @@ "@types/bpmn-moddle": "^5.1.11", "@types/express": "^4.17.17", "@types/multer": "^1.4.11", + "@types/supertest": "^6.0.2", "bpmn-engine": "^22.0.1", "c8": "^10.1.2", "camunda-bpmn-moddle": "^7.0.1", "chai": "^5.1.1", "chronokinesis": "^6.0.0", - "dts-buddy": "^0.4.7", + "dts-buddy": "^0.5.1", "eslint": "^9.3.0", "express": "^4.18.2", "form-data": "^4.0.0", diff --git a/src/Engines.js b/src/Engines.js index da256c3..ee04ce3 100644 --- a/src/Engines.js +++ b/src/Engines.js @@ -1,8 +1,14 @@ import { LRUCache } from 'lru-cache'; import { MiddlewareEngine } from './MiddlewareEngine.js'; -import { STORAGE_TYPE_STATE } from './constants.js'; import { HttpError } from './Errors.js'; +import { + STORAGE_TYPE_STATE, + SAVE_STATE_ROUTINGKEY, + ENABLE_SAVE_STATE_ROUTINGKEY, + DISABLE_SAVE_STATE_ROUTINGKEY, + ERR_STORAGE_KEY_NOT_FOUND, +} from './constants.js'; /** * Engines class @@ -10,10 +16,11 @@ import { HttpError } from './Errors.js'; */ export function Engines(options) { this.broker = options.broker; - this.engineOptions = options.engineOptions; + this.engineOptions = options.engineOptions || {}; this.idleTimeout = options.idleTimeout; this.adapter = options.adapter; this.engineCache = options.engineCache || new LRUCache({ max: 1000 }); + this.autosaveEngineState = options.autosaveEngineState; // @ts-ignore this.__onStateMessage = this._onStateMessage.bind(this); @@ -64,8 +71,10 @@ Engines.prototype.resume = async function resume(token, listener) { throw new HttpError(`Token ${token} not found`, 404); } - if (state?.state === 'idle') { + if (state.state === 'idle') { throw new HttpError(`Token ${token} has already completed`, 400); + } else if (state.state === 'error') { + throw new HttpError(`Token ${token} has failed`, 400); } // @ts-ignore @@ -111,7 +120,12 @@ Engines.prototype.resume = async function resume(token, listener) { Engines.prototype.signalActivity = async function signalActivity(token, listener, body) { const engine = await this.resume(token, listener); - engine.execution.signal(body); + await new Promise((resolve) => { + process.nextTick(() => { + engine.execution.signal(body); + resolve(); + }); + }); return engine; }; @@ -157,6 +171,7 @@ Engines.prototype.getPostponed = async function getPostponed(token, listener) { return { token, ...api.content, + // @ts-ignore executing: api.getExecuting()?.map((e) => ({ ...e.content })), }; }); @@ -259,6 +274,7 @@ Engines.prototype.createEngine = function createEngine(executeOptions) { source, listener, settings: { + autosaveEngineState: this.autosaveEngineState, ...this.engineOptions?.settings, idleTimeout, ...settings, @@ -268,6 +284,9 @@ Engines.prototype.createEngine = function createEngine(executeOptions) { ...variables, token, }, + services: { + ...this.engineOptions?.services, + }, token, sequenceNumber: 0, caller, @@ -313,6 +332,64 @@ Engines.prototype.getEngineStatus = function getEngineStatus(engine) { return result; }; +/** + * Create engine state + * @param {MiddlewareEngine} engine + */ +Engines.prototype.createEngineState = function createEngineState(engine) { + const { token, expireAt, sequenceNumber, caller } = engine.options; + /** @type {import('types').MiddlewareEngineState} */ + const state = { + token, + name: engine.name, + expireAt, + sequenceNumber, + ...(caller && { caller }), + }; + + /** @type {import('types').postponed[]} */ + const postponed = (state.postponed = []); + for (const elmApi of engine.execution.getPostponed()) { + postponed.push({ id: elmApi.id, type: elmApi.type }); + + if (elmApi.content.isSubProcess) { + for (const subElmApi of elmApi.getPostponed()) { + if (subElmApi.id === elmApi.id) continue; + postponed.push({ id: subElmApi.id, type: subElmApi.type }); + } + } + } + + if (!engine.stopped) { + state.activityStatus = engine.activityStatus; + state.state = engine.state; + } + + state.engine = engine.execution.getState(); + + return state; +}; + +/** + * Save engine state + * @param {MiddlewareEngine} engine + * @param {boolean} [ifExists] save engine state if existing state + */ +Engines.prototype.saveEngineState = async function saveEngineState(engine, ifExists) { + const state = this.createEngineState(engine); + if (ifExists) { + try { + await this.adapter.update(STORAGE_TYPE_STATE, state.token, state); + } catch (err) { + // @ts-ignore + if (err.code === ERR_STORAGE_KEY_NOT_FOUND) return; + throw err; + } + } else { + await this.adapter.upsert(STORAGE_TYPE_STATE, state.token, state); + } +}; + /** * Internal setup engine listeners * @param {MiddlewareEngine} engine @@ -324,6 +401,10 @@ Engines.prototype._setupEngine = function setupEngine(engine) { engineOptions.sequenceNumber = engineOptions.sequenceNumber ?? 0; + engine.environment.addService('saveState', saveState); + engine.environment.addService('enableSaveState', enableSaveState); + engine.environment.addService('disableSaveState', disableSaveState); + if (parentBroker) { parentBroker.assertExchange('event', 'topic', { durable: false, autoDelete: false }); engineBroker.createShovel( @@ -353,6 +434,10 @@ Engines.prototype._setupEngine = function setupEngine(engine) { ); engineBroker.assertExchange('state', 'topic', { durable: false, autoDelete: false }); + + engineBroker.bindExchange('event', 'state', SAVE_STATE_ROUTINGKEY); + engineBroker.bindExchange('event', 'state', ENABLE_SAVE_STATE_ROUTINGKEY); + engineBroker.bindExchange('event', 'state', DISABLE_SAVE_STATE_ROUTINGKEY); engineBroker.bindExchange('event', 'state', 'activity.wait'); engineBroker.bindExchange('event', 'state', 'activity.call'); engineBroker.bindExchange('event', 'state', 'activity.timer'); @@ -380,9 +465,28 @@ Engines.prototype._onStateMessage = async function onStateMessage(routingKey, me if (message.content.isRecovered) return message.ack(); const engineOptions = engine.options; + /** @type {boolean} */ + const autosaveEngineState = engine.environment.settings.autosaveEngineState; + + let saveState = autosaveEngineState; + let saveStateIfExists = false; try { switch (routingKey) { + case SAVE_STATE_ROUTINGKEY: { + saveState = true; + break; + } + case ENABLE_SAVE_STATE_ROUTINGKEY: { + engine.environment.settings.autosaveEngineState = true; + saveState = false; + break; + } + case DISABLE_SAVE_STATE_ROUTINGKEY: { + engine.environment.settings.autosaveEngineState = false; + saveState = false; + break; + } case 'engine.end': this._teardownEngine(engine); engineOptions.expireAt = undefined; @@ -392,11 +496,14 @@ Engines.prototype._onStateMessage = async function onStateMessage(routingKey, me this._teardownEngine(engine); engineOptions.listener.emit(message.properties.type, engine); break; - case 'engine.error': + case 'engine.error': { this._teardownEngine(engine); engineOptions.expireAt = undefined; engineOptions.listener.emit('error', message.content, engine); + saveState = true; + saveStateIfExists = true; break; + } case 'activity.timer': { if (message.content.expireAt) { const currentExpireAt = (engineOptions.expireAt = engine.expireAt); @@ -410,7 +517,9 @@ Engines.prototype._onStateMessage = async function onStateMessage(routingKey, me break; } - await this._saveEngineState(engine); + if (saveState) { + await this.saveEngineState(engine, saveStateIfExists); + } } catch (err) { this._teardownEngine(engine); engine.stop(); @@ -420,37 +529,6 @@ Engines.prototype._onStateMessage = async function onStateMessage(routingKey, me message.ack(); }; -/** - * Internal save engine state - * @param {MiddlewareEngine} engine - */ -Engines.prototype._saveEngineState = async function saveEngineState(engine) { - const { token, expireAt, sequenceNumber, caller } = engine.options; - /** @type {import('types').MiddlewareEngineState} */ - const state = { - token, - name: engine.name, - expireAt, - sequenceNumber, - ...(caller && { caller }), - }; - - /** @type {import('types').postponed[]} */ - const postponed = (state.postponed = []); - for (const elm of engine.execution.getPostponed()) { - postponed.push({ id: elm.id, type: elm.type }); - } - - if (!engine.stopped) { - state.activityStatus = engine.activityStatus; - state.state = engine.state; - } - - state.engine = engine.execution.getState(); - - await this.adapter.upsert(STORAGE_TYPE_STATE, token, state); -}; - /** * Internal teardown engine, remove listeners and stuff * @param {MiddlewareEngine} engine @@ -486,3 +564,51 @@ Engines.prototype._getActivityApi = function getActivityApi(engine, body) { // @ts-ignore return activity.getApi(); }; + +/** + * Save state service function + * @this import('bpmn-elements').Activity + * @param {any[]} args + */ +function saveState(...args) { + const callback = args.pop(); + const msg = args.pop(); + + if (!msg?.content?.isRecovered) { + this.broker.publish('event', SAVE_STATE_ROUTINGKEY, {}); + } + + callback(null, true); +} + +/** + * Enable auto-save state service function + * @this import('bpmn-elements').Activity + * @param {any[]} args + */ +function enableSaveState(...args) { + const callback = args.pop(); + const msg = args.pop(); + + if (!msg?.content?.isRecovered) { + this.broker.publish('event', ENABLE_SAVE_STATE_ROUTINGKEY, {}); + } + + callback(null, true); +} + +/** + * Enable auto-save state service function + * @this import('bpmn-elements').Activity + * @param {any[]} args + */ +function disableSaveState(...args) { + const callback = args.pop(); + const msg = args.pop(); + + if (!msg?.content?.isRecovered) { + this.broker.publish('event', DISABLE_SAVE_STATE_ROUTINGKEY, {}); + } + + callback(null, true); +} diff --git a/src/Errors.js b/src/Errors.js index 27eba06..7096db5 100644 --- a/src/Errors.js +++ b/src/Errors.js @@ -10,3 +10,16 @@ export class HttpError extends Error { this.statusCode = statusCode; } } + +export class StorageError extends Error { + /** + * Error with status code + * @param {string} message Error message + * @param {string} code Error code + */ + constructor(message, code) { + super(message); + Error.captureStackTrace(this, StorageError); + this.code = code; + } +} diff --git a/src/MemoryAdapter.js b/src/MemoryAdapter.js index 846b68c..a499fbe 100644 --- a/src/MemoryAdapter.js +++ b/src/MemoryAdapter.js @@ -1,6 +1,7 @@ import { LRUCache } from 'lru-cache'; -import { STORAGE_TYPE_STATE } from './constants.js'; +import { StorageError } from './Errors.js'; +import { STORAGE_TYPE_STATE, ERR_STORAGE_KEY_NOT_FOUND } from './constants.js'; /** * Memory adapter @@ -13,10 +14,10 @@ export function MemoryAdapter(storage) { /** * Upsert - * @param {string} type - * @param {string} key - * @param {any} value - * @param {any} [options] + * @param {string} type storage type + * @param {string} key storage key + * @param {any} value value to store + * @param {any} [options] storage set options */ MemoryAdapter.prototype.upsert = function upsert(type, key, value, options) { const storageKey = `${type}:${key}`; @@ -34,6 +35,23 @@ MemoryAdapter.prototype.upsert = function upsert(type, key, value, options) { return Promise.resolve(); }; +/** + * Update existing + * @param {string} type storage type + * @param {string} key storage key + * @param {any} value value to store + * @param {any} [options] storage set options + * @throws {StorageError} + */ +MemoryAdapter.prototype.update = function upsert(type, key, value, options) { + const storageKey = `${type}:${key}`; + if (!this.storage.has(storageKey)) { + return Promise.reject(new StorageError(`${storageKey} not found`, ERR_STORAGE_KEY_NOT_FOUND)); + } + + return this.upsert(type, key, value, options); +}; + /** * Delete * @param {string} type @@ -48,7 +66,7 @@ MemoryAdapter.prototype.delete = function deleteByKey(type, key) { * Fetch * @param {string} type * @param {string} key - * @param {any} [options] + * @param {any} [options] Passed as fetch options to LRU cache */ MemoryAdapter.prototype.fetch = async function fetch(type, key, options) { const value = await this.storage.fetch(`${type}:${key}`, options); diff --git a/src/MiddlewareEngine.js b/src/MiddlewareEngine.js index 5e0bb9b..9936fcb 100644 --- a/src/MiddlewareEngine.js +++ b/src/MiddlewareEngine.js @@ -14,12 +14,16 @@ export class MiddlewareEngine extends Engine { * @type {string} */ this.token = token; - /** @type {import('bpmn-elements').Timer | null | void} */ + /** + * Execution idle timer + * @type {import('bpmn-elements').Timer | null | void} + */ this.idleTimer = null; this.engineTimers = this.environment.timers.register({ id: token }); } /** * Closest due time when a registered timer expires + * Ignores idle timer */ get expireAt() { /** @type {Date?} */ @@ -37,22 +41,27 @@ export class MiddlewareEngine extends Engine { */ startIdleTimer() { const delay = this.environment.settings.idleTimeout ?? 120000; - const timers = this.engineTimers; + const engineTimers = this.engineTimers; const current = this.idleTimer; - if (current) this.idleTimer = timers.clearTimeout(current); + if (current) this.idleTimer = engineTimers.clearTimeout(current); if (this.state !== 'running') return; - this.idleTimer = timers.setTimeout(() => { + this.idleTimer = engineTimers.setTimeout(() => { const status = this._getCurrentStatus(); switch (status.activityStatus) { case 'executing': break; - default: { - const expireAt = status.expireAt; - if (expireAt !== null && expireAt < new Date(Date.now() + delay * 2)) break; + case 'wait': { this.idleTimer = null; return this.stop(); } + case 'timer': { + if (status.expireAt > new Date(Date.now() + delay * 2)) { + this.idleTimer = null; + return this.stop(); + } + break; + } } this.startIdleTimer(); diff --git a/src/constants.js b/src/constants.js index c5eea7f..dbeeaac 100644 --- a/src/constants.js +++ b/src/constants.js @@ -2,3 +2,7 @@ export const STORAGE_TYPE_DEPLOYMENT = 'deployment'; export const STORAGE_TYPE_STATE = 'state'; export const STORAGE_TYPE_FILE = 'file'; export const DEFAULT_IDLE_TIMER = 120000; +export const SAVE_STATE_ROUTINGKEY = 'activity.state.save'; +export const ENABLE_SAVE_STATE_ROUTINGKEY = 'activity.state.save.enable'; +export const DISABLE_SAVE_STATE_ROUTINGKEY = 'activity.state.save.disable'; +export const ERR_STORAGE_KEY_NOT_FOUND = 'ERR_BPMN_MIDDLEWARE_STORAGE_KEY_NOT_FOUND'; diff --git a/src/index.js b/src/index.js index 98394db..315b5e8 100644 --- a/src/index.js +++ b/src/index.js @@ -9,7 +9,7 @@ import { STORAGE_TYPE_DEPLOYMENT, STORAGE_TYPE_FILE, STORAGE_TYPE_STATE, DEFAULT import { MulterAdapterStorage } from './MulterAdapterStorage.js'; import { Engines } from './Engines.js'; import { MemoryAdapter } from './MemoryAdapter.js'; -import { HttpError } from './Errors.js'; +import { HttpError, StorageError } from './Errors.js'; import { MiddlewareEngine } from './MiddlewareEngine.js'; import { fromActivityApi } from './Caller.js'; @@ -18,7 +18,7 @@ const nodeRequire = createRequire(fileURLToPath(import.meta.url)); const packageInfo = nodeRequire(join(process.cwd(), 'package.json')); const kInitilialized = Symbol.for('initialized'); -export { Engines, MemoryAdapter, HttpError, MiddlewareEngine }; +export { Engines, MemoryAdapter, HttpError, StorageError, MiddlewareEngine }; export * from './constants.js'; const snakeReplacePattern = /\W/g; @@ -31,10 +31,12 @@ export function bpmnEngineMiddleware(options) { const adapter = options?.adapter || new MemoryAdapter(); const engines = new Engines({ adapter, - engineOptions: { ...options?.engineOptions }, - engineCache: options?.engineCache, - broker: options?.broker, - idleTimeout: options?.idleTimeout ?? DEFAULT_IDLE_TIMER, + idleTimeout: DEFAULT_IDLE_TIMER, + autosaveEngineState: true, + ...options, + // engineOptions: { ...options?.engineOptions }, + // engineCache: options?.engineCache, + // broker: options?.broker, }); const storage = new MulterAdapterStorage(adapter); @@ -121,14 +123,6 @@ BpmnEngineMiddleware.prototype.init = function init(req, _, next) { return next(); }; -/** - * BPMN middleware locals - * @typedef {Object} BpmnMiddlewareLocals - * @property {Engines} engines - Engine factory - * @property {import('types').IStorageAdapter} adapter - Storage adapter - * @property {BpmnPrefixListener} listener - Bpmn engine listener - */ - /** * Add middleware response locals * @param {import('express').Request} req @@ -540,7 +534,7 @@ BpmnPrefixListener.prototype.emit = function emitBpmnEvent(eventName, ...args) { }; /** - * + * Replace non-word characters with underscore * @param {...string} args */ function slugify(...args) { @@ -551,6 +545,14 @@ function slugify(...args) { return slugs.join('_'); } +/** + * BPMN middleware locals + * @typedef {Object} BpmnMiddlewareLocals + * @property {Engines} engines - Engine factory + * @property {import('types').IStorageAdapter} adapter - Storage adapter + * @property {BpmnPrefixListener} listener - Bpmn engine listener + */ + /** * Create deployment result * @typedef {Object} CreateDeploymentResponseBody diff --git a/test/express-middleware-test.js b/test/express-middleware-test.js index 2fe4c00..c8e6ee1 100644 --- a/test/express-middleware-test.js +++ b/test/express-middleware-test.js @@ -208,29 +208,29 @@ describe('express-middleware', () => { form.append( `${deploymentName}.bpmn`, ` - - - - - - get - - - - http://example.com/\${environment.variables.businessKey}/\${environment.variables.foo} - \${environment.variables.json} - \${environment.settings.enableDummyService} - - - \${result[0].statusCode} - - - - - - `, + + + + + + get + + + + http://example.com/\${environment.variables.businessKey}/\${environment.variables.foo} + \${environment.variables.json} + \${environment.settings.enableDummyService} + + + \${result[0].statusCode} + + + + + + `, `${deploymentName}.bpmn`, ); @@ -274,29 +274,29 @@ describe('express-middleware', () => { form.append( `${deploymentName}.bpmn`, ` - - - - - - get - - - - http://example.com/\${environment.variables.businessKey}/\${environment.variables.foo} - \${environment.variables.json} - \${environment.settings.enableDummyService} - - - \${result[0].statusCode} - - - - - - `, + + + + + + get + + + + http://example.com/\${environment.variables.businessKey}/\${environment.variables.foo} + \${environment.variables.json} + \${environment.settings.enableDummyService} + + + \${result[0].statusCode} + + + + + + `, `${deploymentName}.bpmn`, ); diff --git a/test/features/idle-feature.js b/test/features/idle-feature.js index 4f929a7..2762898 100644 --- a/test/features/idle-feature.js +++ b/test/features/idle-feature.js @@ -33,17 +33,17 @@ Feature('idle engine', () => { app2, 'idle-engine', ` - - - - - - PT10M - - - - `, + + + + + + PT10M + + + + `, ); }); diff --git a/test/features/save-state-feature.js b/test/features/save-state-feature.js new file mode 100644 index 0000000..69c7631 --- /dev/null +++ b/test/features/save-state-feature.js @@ -0,0 +1,462 @@ +import request from 'supertest'; +import { LRUCache } from 'lru-cache'; + +import * as testHelpers from '../helpers/testHelpers.js'; +import { MemoryAdapter } from '../../src/index.js'; +import { StorageError } from '../../src/Errors.js'; + +const saveStateResource = testHelpers.getResource('save-state.bpmn'); +const disableSaveStateResource = testHelpers.getResource('disable-save-state.bpmn'); + +Feature('save state', () => { + Scenario('source with service task that saves state and then a timer and a message event, when messaged auto-save is enabled', () => { + /** @type {MemoryAdapter} */ + let adapter; + /** @type {ReturnType} */ + let apps; + /** @type {ReturnType} */ + let appsWithoutAutosave; + before(() => { + adapter = new MemoryAdapter(); + apps = testHelpers.horizontallyScaled(2, { adapter }); + }); + after(() => { + apps?.stop(); + appsWithoutAutosave?.stop(); + }); + + let deploymentName; + Given('a source matching scenario is deployed', async () => { + deploymentName = 'manual-save-state'; + await testHelpers.createDeployment(apps.balance(), deploymentName, saveStateResource); + }); + + let startingApp; + let timer; + let token; + let response; + When('process is started', async () => { + startingApp = apps.balance(); + timer = testHelpers.waitForProcess(startingApp, deploymentName).timer(); + + response = await request(startingApp).post(`/rest/process-definition/${deploymentName}/start`).expect(201); + + token = response.body.id; + }); + + Then('timer is started', () => { + return timer; + }); + + let completed; + When('timer times out', () => { + const [engine] = apps.getRunningByToken(token); + completed = engine.waitFor('end'); + const timer = engine.environment.timers.executing.find((t) => t.owner.id === 'timeout'); + timer.callback(); + }); + + Then('run completes', () => { + return completed; + }); + + When('attempting to signal message event', async () => { + response = await apps + .request() + .post('/rest/signal/' + token) + .send({ + id: 'ContinueMessage', + }); + }); + + Then('bad request is returned since process is already completed', () => { + expect(response.statusCode, response.text).to.equal(400); + expect(response.body.message).to.match(/already completed/); + }); + + When('process is started', async () => { + startingApp = apps.balance(); + timer = testHelpers.waitForProcess(startingApp, deploymentName).timer(); + + response = await request(startingApp).post(`/rest/process-definition/${deploymentName}/start`).expect(201); + + token = response.body.id; + }); + + Then('timer is started', () => { + return timer; + }); + + When('attempting to signal message event', async () => { + response = await apps + .request() + .post('/rest/signal/' + token) + .send({ + id: 'Message_0', + }); + }); + + Then('run completes', () => { + return completed; + }); + + When('attempting to signal message event again', async () => { + response = await apps + .request() + .post('/rest/signal/' + token) + .send({ + id: 'Message_0', + }); + }); + + Then('bad request is returned since process is already completed', () => { + expect(response.statusCode, response.text).to.equal(400); + expect(response.body.message).to.match(/already completed/); + }); + + describe('auto-save is disabled', () => { + Given('a new middleware is added with auto save disabled', () => { + appsWithoutAutosave = testHelpers.horizontallyScaled(2, { adapter, autosaveEngineState: false }); + }); + + When('process is started on manual save instance', async () => { + startingApp = appsWithoutAutosave.balance(); + timer = testHelpers.waitForProcess(startingApp, deploymentName).timer(); + + response = await request(startingApp).post(`/rest/process-definition/${deploymentName}/start`).expect(201); + + token = response.body.id; + }); + + Then('timer is started again', () => { + return timer; + }); + + When('timer times out', () => { + const [engine] = appsWithoutAutosave.getRunningByToken(token); + completed = engine.waitFor('end'); + const timer = engine.environment.timers.executing.find((t) => t.owner.id === 'timeout'); + timer.callback(); + }); + + Then('run manually saved instance completes by termination event', () => { + return completed; + }); + + And('no running engines', () => { + expect(appsWithoutAutosave.getRunningByToken(token).length).to.not.be.ok; + }); + + When('getting manually saved process state', async () => { + response = await appsWithoutAutosave + .request() + .get('/rest/state/' + token) + .expect(200); + }); + + Then('save state service is postponed', () => { + expect(response.body.postponed.find((p) => p.id === 'save-state')).to.be.ok; + }); + + When('attempting to signal message event', async () => { + startingApp = appsWithoutAutosave.balance(); + completed = testHelpers.waitForProcess(startingApp, deploymentName).end(); + + response = await request(startingApp) + .post('/rest/signal/' + token) + .send({ + id: 'Message_0', + }); + }); + + Then('OK is returned', () => { + expect(response.statusCode, response.text).to.equal(200); + }); + + And('signalled manually saved instance completes', () => { + return completed; + }); + + When('attempting to signal the same process again', async () => { + response = await appsWithoutAutosave + .request() + .post('/rest/signal/' + token) + .send({ + id: 'Message_0', + }); + }); + + Then('bad request is returned since manually saved process is already completed', () => { + expect(response.statusCode, response.text).to.equal(400); + expect(response.body.message).to.match(/already completed/); + }); + }); + }); + + Scenario('source with service tasks that disables state, saves, and then subsequently enables state', () => { + /** @type {MemoryAdapter} */ + let adapter; + /** @type {ReturnType} */ + let apps; + + function serviceFn(...args) { + args.pop()(); + } + + before(() => { + adapter = new MemoryAdapter(); + apps = testHelpers.horizontallyScaled(2, { adapter, engineOptions: { services: { serviceFn } } }); + }); + after(() => { + apps?.stop(); + }); + + let deploymentName; + Given('a source matching scenario is deployed', async () => { + deploymentName = 'disable-save-state'; + await testHelpers.createDeployment(apps.balance(), deploymentName, disableSaveStateResource); + }); + + let calledApp; + let timer; + let token; + let response; + When('process is started', async () => { + calledApp = apps.balance(); + timer = testHelpers.waitForProcess(calledApp, deploymentName).timer(); + + response = await request(calledApp).post(`/rest/process-definition/${deploymentName}/start`).expect(201); + + token = response.body.id; + }); + + Then('timer is started', () => { + return timer; + }); + + let completed; + When('timer times out', () => { + const [engine] = apps.getRunningByToken(token); + completed = engine.waitFor('end'); + const timer = engine.environment.timers.executing.find((t) => t.owner.id === 'timeout'); + timer.callback(); + }); + + Then('process completes', () => { + return completed; + }); + + When('attempting to signal message event', async () => { + calledApp = apps.balance(); + completed = testHelpers.waitForProcess(calledApp, deploymentName).end(); + + response = await request(calledApp) + .post('/rest/signal/' + token) + .send({ + id: 'Message_0', + }); + }); + + Then('OK is returned', () => { + expect(response.statusCode, response.text).to.equal(200); + }); + + And('saved instance completes', () => { + return completed; + }); + + When('attempting to signal the same process again', async () => { + response = await apps + .request() + .post('/rest/signal/' + token) + .send({ + id: 'Message_0', + }); + }); + + Then('bad request is returned since manually saved process is already completed', () => { + expect(response.statusCode, response.text).to.equal(400); + expect(response.body.message).to.match(/already completed/); + }); + + describe('manually saved process fails', () => { + function failServiceFn(...args) { + args.pop()(new Error('Volatile')); + } + + before(() => { + apps.stop(); + apps = testHelpers.horizontallyScaled(2, { + adapter, + autosaveEngineState: false, + engineOptions: { services: { serviceFn: failServiceFn } }, + }); + }); + + let errored; + When('process is started with a volatile service function', async () => { + calledApp = apps.balance(); + errored = testHelpers.waitForProcess(calledApp, deploymentName).error(); + + response = await request(calledApp).post(`/rest/process-definition/${deploymentName}/start`).expect(201); + + token = response.body.id; + }); + + Then('run fails', () => { + return errored; + }); + + When('attempting to signal the failed process', async () => { + response = await apps + .request() + .post('/rest/signal/' + token) + .send({ + id: 'Message_0', + }); + }); + + Then('bad request is returned since failed process is already completed', () => { + expect(response.statusCode, response.text).to.equal(400); + expect(response.body.message).to.match(/failed/); + }); + }); + }); + + Scenario('process fails on manual save state app', () => { + /** @type {MemoryAdapter} */ + let adapter; + /** @type {ReturnType} */ + let apps; + before(() => { + adapter = new MemoryAdapter(); + apps = testHelpers.horizontallyScaled(2, { adapter, autosaveEngineState: false }); + }); + after(() => { + apps?.stop(); + }); + + let deploymentName; + Given('a source matching scenario is deployed', async () => { + deploymentName = 'volatile-process'; + + await testHelpers.createDeployment( + apps.balance(), + deploymentName, + ` + + + + + + + `, + ); + }); + + let errored; + let calledApp; + let token; + let response; + When('process is started with a volatile service function', async () => { + calledApp = apps.balance(); + errored = testHelpers.waitForProcess(calledApp, deploymentName).error(); + + response = await request(calledApp).post(`/rest/process-definition/${deploymentName}/start`).expect(201); + + token = response.body.id; + }); + + Then('run fails', () => { + return errored; + }); + + When('attempting to get state of the failed process', async () => { + response = await apps + .request() + .get('/rest/state/' + token) + .send({ + id: 'Message_0', + }); + }); + + Then('not found is returned', () => { + expect(response.statusCode, response.text).to.equal(404); + }); + }); + + Scenario('adapter misbehaves', () => { + /** @type {import('../../types/interfaces.js').IStorageAdapter} */ + let adapter; + /** @type {ReturnType} */ + let apps; + before(() => { + adapter = new MisbehavingAdapter(); + apps = testHelpers.horizontallyScaled(2, { adapter, autosaveEngineState: false }); + }); + after(() => { + apps?.stop(); + }); + + let deploymentName; + Given('a source matching scenario is deployed', async () => { + deploymentName = 'volatile-process'; + + await testHelpers.createDeployment( + apps.balance(), + deploymentName, + ` + + + + + + + `, + ); + }); + + let errored; + let calledApp; + let token; + let response; + When('process is started with a volatile service function', async () => { + calledApp = apps.balance(); + errored = testHelpers.waitForProcess(calledApp, deploymentName).error(); + + response = await request(calledApp).post(`/rest/process-definition/${deploymentName}/start`).expect(201); + + token = response.body.id; + }); + + Then('run fails', () => { + return errored; + }); + + And('engine is not running', () => { + expect(apps.getRunningByToken(token)).to.have.length(0); + }); + + When('attempting to get state of the failed process', async () => { + response = await apps + .request() + .get('/rest/state/' + token) + .send({ + id: 'Message_0', + }); + }); + + Then('not found is returned', () => { + expect(response.statusCode, response.text).to.equal(404); + }); + }); +}); + +class MisbehavingAdapter extends MemoryAdapter { + update(type, key, value, options) { + if (!this._data.has(`${type}:${key}`)) return Promise.reject(new StorageError(`${type}:key not found`, 'MY_OWN_CODE')); + return this.upsert(type, key, value, options); + } +} diff --git a/test/features/signal-feature.js b/test/features/signal-feature.js index 273a1ea..f799967 100644 --- a/test/features/signal-feature.js +++ b/test/features/signal-feature.js @@ -162,17 +162,17 @@ Feature('signal activity', () => { apps.balance(), 'user-and-receive-process', ` - - - - - - - - - - - `, + + + + + + + + + + + `, ); }); @@ -254,4 +254,47 @@ Feature('signal activity', () => { expect(response.body).to.have.property('postponed').with.length(0); }); }); + + Scenario('signal with different type of payloads', () => { + let apps, adapter; + before(() => { + adapter = new MemoryAdapter(); + apps = horizontallyScaled(2, { adapter }); + }); + after(() => apps.stop()); + + let deploymentName; + Given('a process', () => { + deploymentName = 'user-task-process'; + return createDeployment( + apps.balance(), + deploymentName, + ` + + + + `, + ); + }); + + let token, wait; + When('when process is started', async () => { + const app = apps.balance(); + wait = waitForProcess(app, deploymentName).wait(); + + const response = await request(app).post(`/rest/process-definition/${deploymentName}/start`).expect(201); + + token = response.body.id; + }); + + Then('run is waiting', () => { + return wait; + }); + + When('task is signalled with an empty buffer', () => { + return apps.request().post(`/rest/signal/${token}`).send({ id: 0 }).expect(200); + }); + + Then('nothing happens', () => {}); + }); }); diff --git a/test/helpers/testHelpers.js b/test/helpers/testHelpers.js index d76009e..71d4b05 100644 --- a/test/helpers/testHelpers.js +++ b/test/helpers/testHelpers.js @@ -1,5 +1,6 @@ import { randomInt } from 'node:crypto'; import { createRequire } from 'node:module'; +import fs from 'node:fs'; import FormData from 'form-data'; import { extensions, extendFn, OnifySequenceFlow, OnifyTimerEventDefinition } from '@onify/flow-extensions'; @@ -23,7 +24,13 @@ const elements = { TimerEventDefinition: OnifyTimerEventDefinition, }; +/** + * Create apps with middleware options + * @param {number} [instances] Number of instances, defaults to 2 + * @param {import('../../types/interfaces.js').BpmnMiddlewareOptions} [options] + */ export function horizontallyScaled(instances = 2, options) { + /** @type {LRUCache} */ const storage = new LRUCache({ max: 1000 }); const apps = new Array(instances).fill().map(() => getAppWithExtensions({ adapter: new MemoryAdapter(storage), ...options })); @@ -49,6 +56,9 @@ export function horizontallyScaled(instances = 2, options) { return result; }, []); }, + /** + * @param {string} token + */ getRunningByToken(token) { return apps.reduce((result, app) => { const engine = app.locals.engineCache.get(token); @@ -65,6 +75,9 @@ export function horizontallyScaled(instances = 2, options) { } } +/** + * @param {import('../../types/interfaces.js').MiddlewareEngineOptions} options + */ export function getAppWithExtensions(options = {}) { const app = express(); const broker = (app.locals.broker = options.broker ?? new Broker(app)); @@ -90,6 +103,12 @@ export function getAppWithExtensions(options = {}) { return app; } +/** + * Create deployment multi-part-form + * @param {import('express').Express} app + * @param {string} name + * @param {string | Buffer} source + */ export async function createDeployment(app, name, source) { const form = new FormData(); form.append('deployment-name', name); @@ -101,6 +120,11 @@ export async function createDeployment(app, name, source) { return response; } +/** + * Wait for process event + * @param {import('express').Express} app + * @param {string} nameOrToken + */ export function waitForProcess(app, nameOrToken) { const broker = app.locals.broker; return { @@ -110,6 +134,7 @@ export function waitForProcess(app, nameOrToken) { wait, event, idle, + timer, }; function end() { @@ -120,6 +145,9 @@ export function waitForProcess(app, nameOrToken) { return event('engine.stop'); } + /** + * @param {string} activityId + */ function wait(activityId) { if (!activityId) return event('activity.wait'); return event('activity.wait', (msg) => { @@ -127,6 +155,16 @@ export function waitForProcess(app, nameOrToken) { }); } + /** + * @param {string} activityId + */ + function timer(activityId) { + if (!activityId) return event('activity.timer'); + return event('activity.timer', (msg) => { + return msg.content.id === activityId; + }); + } + function idle() { const engine = app.locals.engineCache.get(nameOrToken); if (!engine) throw new Error(`No engine with token >>${nameOrToken}<<`); @@ -136,6 +174,10 @@ export function waitForProcess(app, nameOrToken) { } } + /** + * @param {string} eventRoutingKey + * @param {CallableFunction} [expectFn] + */ function event(eventRoutingKey, expectFn) { return new Promise((resolve, reject) => { const rnd = randomInt(10000); @@ -202,6 +244,10 @@ export function waitForProcess(app, nameOrToken) { }); } + /** + * @param {import('bpmn-elements').ElementBrokerMessage} msg + * @param {CallableFunction} [expectFn] + */ function filterByNameOrToken(msg, expectFn) { const matchToken = msg.properties.token === nameOrToken || msg.properties.deployment === nameOrToken; if (matchToken && expectFn && !expectFn(msg)) return false; @@ -211,10 +257,12 @@ export function waitForProcess(app, nameOrToken) { export function fakeTimers() { let counter = 0; + /** @type {any[]} */ const registered = []; return new bpmnElements.Timers({ registered, + // @ts-ignore setTimeout: function fakeSetTimeout() { const ref = counter++; registered.push(ref); @@ -227,10 +275,24 @@ export function fakeTimers() { }); } -export function errorHandler(err, req, res, next) { +/** + * Express error handler middleware + * @param {Error} err + * @param {import('express').Request} _req + * @param {import('express').Response} res + * @param {import('express').NextFunction} next + */ +export function errorHandler(err, _req, res, next) { if (!(err instanceof Error)) return next(); // eslint-disable-next-line no-console if (process.env.TEST_ERR) console.log({ err }); if (err instanceof HttpError) return res.status(err.statusCode).send({ message: err.message }); res.status(502).send({ message: err.message }); } + +/** + * @param {string} name + */ +export function getResource(name) { + return fs.readFileSync('./test/resources/' + name); +} diff --git a/test/resources/disable-save-state.bpmn b/test/resources/disable-save-state.bpmn new file mode 100644 index 0000000..ecb1c94 --- /dev/null +++ b/test/resources/disable-save-state.bpmn @@ -0,0 +1,136 @@ + + + + + Flow_0ntsbez + + + + Flow_06plgov + Flow_1cynmye + + + Flow_1ehbeu5 + to-continue + to-timeout + + + + + ${true} + + + to-continue + Flow_1898rbq + + + + to-timeout + Flow_0zxbq2e + + PT10S + + + + Flow_0zxbq2e + + + + Flow_01zkt0m + + + + + + + + + + Flow_0ntsbez + Flow_06plgov + + + + + Flow_1cynmye + Flow_1ehbeu5 + + + Flow_1898rbq + Flow_01zkt0m + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/test/resources/save-state.bpmn b/test/resources/save-state.bpmn new file mode 100644 index 0000000..f652212 --- /dev/null +++ b/test/resources/save-state.bpmn @@ -0,0 +1,183 @@ + + + + + Flow_1nwcici + + + + + ${content.output.continuedWithMessage} + + + Flow_1nwcici + to-is-continued + + Flow_0ntsbez + + + + + + saveState + + + Flow_0ntsbez + Flow_1cynmye + + + + Flow_1cynmye + to-continue + to-timeout + + + + + ${true} + + + to-continue + Flow_1898rbq + + + + + to-timeout + Flow_0zxbq2e + + PT10S + + + + + + Flow_0zxbq2e + + + + Flow_1898rbq + + + + + + + + to-end + + + + + to-is-continued + to-enable-save-state + Flow_06oy5wv + + + ${environment.output.continued} + + + + Flow_06oy5wv + + + + to-enable-save-state + to-end + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/types/index.d.ts b/types/index.d.ts index 6147629..900e7e2 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -3,8 +3,6 @@ declare module 'bpmn-middleware' { import type { ActivityStatus, ElementMessageContent } from 'bpmn-elements'; import type { LRUCache } from 'lru-cache'; import type { Broker } from 'smqp'; - /// - /// /** * BPMN 2 Engine middleware * */ @@ -30,7 +28,7 @@ declare module 'bpmn-middleware' { extendFn?: import("moddle-context-serializer").extendFn; moddleOptions?: any; moddleContext?: import("bpmn-moddle").Definitions; - listener?: import("bpmn-engine").IListenerEmitter | import("events")<[never]>; + listener?: import("events") | import("bpmn-engine").IListenerEmitter; settings?: import("bpmn-elements").EnvironmentSettings; variables?: Record; services?: Record; @@ -43,22 +41,19 @@ declare module 'bpmn-middleware' { /** * Bound addEngineLocals */ - _addEngineLocals: (req: import('express').Request, res: import('express').Response, next: import('express').NextFunction) => void; + _addEngineLocals: (req: import("express").Request, res: import("express").Response, next: import("express").NextFunction) => void; /** * Initialize engine * */ - init(req: import('express').Request, _: import('express').Response, next: import('express').NextFunction): void; - /** - * BPMN middleware locals - * */ + init(req: import("express").Request, _: import("express").Response, next: import("express").NextFunction): void; /** * Add middleware response locals * */ - addEngineLocals(req: import('express').Request, res: import('express').Response, next: import('express').NextFunction): void; + addEngineLocals(req: import("express").Request, res: import("express").Response, next: import("express").NextFunction): void; /** * Get package version * */ - getVersion(_: import('express').Request, res: import('express').Response): import("express").Response): import("express").Response<{ name: string; @@ -74,99 +69,99 @@ declare module 'bpmn-middleware' { /** * Create deployment * */ - create(req: import('express').Request, res: import('express').Response, next: import('express').NextFunction): Promise>; + create(req: import("express").Request, res: import("express").Response, next: import("express").NextFunction): Promise>; /** * Start deployment * */ - start(req: import('express').Request<{ + start(req: import("express").Request<{ deploymentName: string; - }>, res: import('express').Response<{ + }>, res: import("express").Response<{ id: string; - }, BpmnMiddlewareLocals>, next: import('express').NextFunction): Promise, next: import("express").NextFunction): Promise>; /** * Start deployment * */ - getScript(req: import('express').Request<{ + getScript(req: import("express").Request<{ deploymentName: string; - }>, res: import('express').Response, next: import('express').NextFunction): Promise>; + }>, res: import("express").Response, next: import("express").NextFunction): Promise>; /** * Get running engines * */ - getRunning(req: import('express').Request, res: import('express').Response>, BpmnMiddlewareLocals>, next: import('express').NextFunction): Promise>; + getRunning(req: import("express").Request, res: import("express").Response>, BpmnMiddlewareLocals>, next: import("express").NextFunction): Promise>; /** * Get engine status by token * */ - getStatusByToken(req: import('express').Request, res: import('express').Response>, BpmnMiddlewareLocals>, next: import('express').NextFunction): Promise>; + getStatusByToken(req: import("express").Request, res: import("express").Response>, BpmnMiddlewareLocals>, next: import("express").NextFunction): Promise>; /** * Get engine activity status * */ - getActivityStatus(req: import('express').Request<{ + getActivityStatus(req: import("express").Request<{ token: string; activityId: string; - }>, res: import('express').Response, next: import('express').NextFunction): Promise; + }>, res: import("express").Response, next: import("express").NextFunction): Promise; /** * Signal activity * */ - signalActivity(req: import('express').Request<{ + signalActivity(req: import("express").Request<{ token: string; - }, SignalBody>, res: import('express').Response, BpmnMiddlewareLocals>, next: import('express').NextFunction): Promise>; + }, SignalBody>, res: import("express").Response, BpmnMiddlewareLocals>, next: import("express").NextFunction): Promise>; /** * Cancel activity * */ - cancelActivity(req: import('express').Request<{ + cancelActivity(req: import("express").Request<{ token: string; - }, SignalBody>, res: import('express').Response, BpmnMiddlewareLocals>, next: import('express').NextFunction): Promise>; + }, SignalBody>, res: import("express").Response, BpmnMiddlewareLocals>, next: import("express").NextFunction): Promise>; /** * Fail activity * */ - failActivity(req: import('express').Request<{ + failActivity(req: import("express").Request<{ token: string; - }, SignalBody>, res: import('express').Response, BpmnMiddlewareLocals>, next: import('express').NextFunction): Promise>; + }, SignalBody>, res: import("express").Response, BpmnMiddlewareLocals>, next: import("express").NextFunction): Promise>; /** * Resume engine by token * */ - resumeByToken(req: import('express').Request<{ + resumeByToken(req: import("express").Request<{ token: string; - }>, res: import('express').Response, BpmnMiddlewareLocals>, next: import('express').NextFunction): Promise>; + }>, res: import("express").Response, BpmnMiddlewareLocals>, next: import("express").NextFunction): Promise>; /** * Get engine state by token * */ - getStateByToken(req: import('express').Request<{ + getStateByToken(req: import("express").Request<{ token: string; - }>, res: import('express').Response>, BpmnMiddlewareLocals>, next: import('express').NextFunction): Promise>; + }>, res: import("express").Response>, BpmnMiddlewareLocals>, next: import("express").NextFunction): Promise>; /** * Delete engine by token * */ - deleteStateByToken(req: import('express').Request<{ + deleteStateByToken(req: import("express").Request<{ token: string; - }>, res: import('express').Response, next: import('express').NextFunction): Promise>; + }>, res: import("express").Response, next: import("express").NextFunction): Promise>; /** * Stop all running engines * */ - internalStopAll(_: import('express').Request, res: import('express').Response): import("express").Response>; + internalStopAll(_: import("express").Request, res: import("express").Response): import("express").Response>; /** * Stop engine by token * */ - internalStopByToken(req: import('express').Request, res: import('express').Response): import("express").Response>; + internalStopByToken(req: import("express").Request, res: import("express").Response): import("express").Response>; /** * Internal start deployment * @returns Started with id token */ - _startDeployment(deploymentName: string, options: import('bpmn-engine').BpmnEngineOptions): Promise<{ + _startDeployment(deploymentName: string, options: import("bpmn-engine").BpmnEngineOptions): Promise<{ id: string; }>; /** * Start process by call activity * */ - _startProcessByCallActivity(callActivityApi: import('bpmn-elements').Api): Promise<{ + _startProcessByCallActivity(callActivityApi: import("bpmn-elements").Api): Promise<{ id: string; }>; /** * Cancel process by call activity * */ - _cancelProcessByCallActivity(callActivityApi: import('bpmn-elements').Api): Promise; + _cancelProcessByCallActivity(callActivityApi: import("bpmn-elements").Api): Promise; /** * Post process engine run * @@ -182,13 +177,13 @@ declare module 'bpmn-middleware' { * Bpmn prefix listener * @param app Express app */ - export function BpmnPrefixListener(app: import('express').Application): void; + export function BpmnPrefixListener(app: import("express").Application): void; export class BpmnPrefixListener { /** * Bpmn prefix listener * @param app Express app */ - constructor(app: import('express').Application); + constructor(app: import("express").Application); app: import("express").Application; /** * Emit event on Express app @@ -237,7 +232,7 @@ declare module 'bpmn-middleware' { } interface BpmnMiddlewareOptions { - adapter: IStorageAdapter; + adapter?: IStorageAdapter; /** Options passed to each created engine */ engineOptions?: BpmnEngineOptions; /** Executing engines */ @@ -246,6 +241,8 @@ declare module 'bpmn-middleware' { broker?: Broker; /** Engine execution timeout before considered idle, defaults to 120000ms */ idleTimeout?: number; + /** Autosave engine state during execution */ + autosaveEngineState?: boolean; } interface MiddlewareEngineOptions extends BpmnEngineOptions { @@ -266,6 +263,7 @@ declare module 'bpmn-middleware' { interface IStorageAdapter { upsert(type: string | StorageType, key: string, value: T, options?: any): Promise; + update(type: string | StorageType, key: string, value: T, options?: any): Promise; fetch(type: string | StorageType, key: string, options?: any): Promise; delete(type: string | StorageType, key: string): Promise; query(type: string | StorageType, qs: StorageQuery, options?: any): Promise<{ records: T[]; [x: string]: any }>; @@ -325,6 +323,10 @@ declare module 'bpmn-middleware' { export const STORAGE_TYPE_STATE: "state"; export const STORAGE_TYPE_FILE: "file"; export const DEFAULT_IDLE_TIMER: 120000; + export const SAVE_STATE_ROUTINGKEY: "activity.state.save"; + export const ENABLE_SAVE_STATE_ROUTINGKEY: "activity.state.save.enable"; + export const DISABLE_SAVE_STATE_ROUTINGKEY: "activity.state.save.disable"; + export const ERR_STORAGE_KEY_NOT_FOUND: "ERR_BPMN_MIDDLEWARE_STORAGE_KEY_NOT_FOUND"; /** * Engines class * */ @@ -339,7 +341,8 @@ declare module 'bpmn-middleware' { idleTimeout: number; adapter: IStorageAdapter; engineCache: LRUCache; - __onStateMessage: (routingKey: string, message: import('smqp').Message, engine: MiddlewareEngine) => Promise; + autosaveEngineState: boolean; + __onStateMessage: (routingKey: string, message: import("smqp").Message, engine: MiddlewareEngine) => Promise; /** * Execute engine * */ @@ -347,23 +350,23 @@ declare module 'bpmn-middleware' { /** * Resume engine execution * */ - resume(token: string, listener?: import('bpmn-engine').IListenerEmitter): Promise; + resume(token: string, listener?: import("bpmn-engine").IListenerEmitter): Promise; /** * Signal activity * */ - signalActivity(token: string, listener: import('bpmn-engine').IListenerEmitter, body: any): Promise; + signalActivity(token: string, listener: import("bpmn-engine").IListenerEmitter, body: SignalBody): Promise; /** * Cancel activity * */ - cancelActivity(token: string, listener: import('bpmn-engine').IListenerEmitter, body: any): Promise; + cancelActivity(token: string, listener: import("bpmn-engine").IListenerEmitter, body: SignalBody): Promise; /** * Fail activity * */ - failActivity(token: string, listener: import('bpmn-engine').IListenerEmitter, body: any): Promise; + failActivity(token: string, listener: import("bpmn-engine").IListenerEmitter, body: SignalBody): Promise; /** * Get postponed activities by token * */ - getPostponed(token: string, listener: import('bpmn-engine').IListenerEmitter): Promise; + getPostponed(token: string, listener: import("bpmn-engine").IListenerEmitter): Promise; /** * Get engine state by token * */ @@ -409,6 +412,15 @@ declare module 'bpmn-middleware' { * Get engine status * */ getEngineStatus(engine: MiddlewareEngine): MiddlewareEngineStatus; + /** + * Create engine state + * */ + createEngineState(engine: MiddlewareEngine): MiddlewareEngineState; + /** + * Save engine state + * @param ifExists save engine state if existing state + */ + saveEngineState(engine: MiddlewareEngine, ifExists?: boolean): Promise; /** * Internal setup engine listeners * */ @@ -416,11 +428,7 @@ declare module 'bpmn-middleware' { /** * Internal on state message * */ - _onStateMessage(routingKey: string, message: import('smqp').Message, engine: MiddlewareEngine): Promise; - /** - * Internal save engine state - * */ - _saveEngineState(engine: MiddlewareEngine): Promise; + _onStateMessage(routingKey: string, message: import("smqp").Message, engine: MiddlewareEngine): Promise; /** * Internal teardown engine, remove listeners and stuff * */ @@ -439,11 +447,14 @@ declare module 'bpmn-middleware' { * Engine execution token * */ token: string; - - idleTimer: import('bpmn-elements').Timer | null | void; + /** + * Execution idle timer + * */ + idleTimer: import("bpmn-elements").Timer | null | void; engineTimers: import("bpmn-elements").RegisteredTimer; /** * Closest due time when a registered timer expires + * Ignores idle timer */ get expireAt(): Date; /** @@ -461,27 +472,38 @@ declare module 'bpmn-middleware' { * Memory adapter * */ - export function MemoryAdapter(storage?: import('lru-cache').LRUCache): void; + export function MemoryAdapter(storage?: import("lru-cache").LRUCache): void; export class MemoryAdapter { /** * Memory adapter * */ - constructor(storage?: import('lru-cache').LRUCache); + constructor(storage?: import("lru-cache").LRUCache); - storage: import('lru-cache').LRUCache; + storage: import("lru-cache").LRUCache; /** * Upsert - * + * @param type storage type + * @param key storage key + * @param value value to store + * @param options storage set options */ upsert(type: string, key: string, value: any, options?: any): Promise; + /** + * Update existing + * @param type storage type + * @param key storage key + * @param value value to store + * @param options storage set options + * */ + update(type: string, key: string, value: any, options?: any): Promise; /** * Delete * */ delete(type: string, key: string): Promise; /** * Fetch - * + * @param options Passed as fetch options to LRU cache */ fetch(type: string, key: string, options?: any): Promise; /** @@ -504,6 +526,17 @@ declare module 'bpmn-middleware' { constructor(message: string, statusCode: number); statusCode: number; } + export class StorageError extends Error { + /** + * Error with status code + * @param message Error message + * @param code Error code + */ + constructor(message: string, code: string); + code: string; + } + + export {}; } //# sourceMappingURL=index.d.ts.map \ No newline at end of file diff --git a/types/interfaces.d.ts b/types/interfaces.d.ts index 227f847..e228868 100644 --- a/types/interfaces.d.ts +++ b/types/interfaces.d.ts @@ -10,7 +10,7 @@ export enum StorageType { } export interface BpmnMiddlewareOptions { - adapter: IStorageAdapter; + adapter?: IStorageAdapter; /** Options passed to each created engine */ engineOptions?: BpmnEngineOptions; /** Executing engines */ @@ -19,6 +19,8 @@ export interface BpmnMiddlewareOptions { broker?: Broker; /** Engine execution timeout before considered idle, defaults to 120000ms */ idleTimeout?: number; + /** Autosave engine state during execution */ + autosaveEngineState?: boolean; } export interface MiddlewareEngineOptions extends BpmnEngineOptions { @@ -39,6 +41,7 @@ export interface StorageQuery { export interface IStorageAdapter { upsert(type: string | StorageType, key: string, value: T, options?: any): Promise; + update(type: string | StorageType, key: string, value: T, options?: any): Promise; fetch(type: string | StorageType, key: string, options?: any): Promise; delete(type: string | StorageType, key: string): Promise; query(type: string | StorageType, qs: StorageQuery, options?: any): Promise<{ records: T[]; [x: string]: any }>;