diff --git a/packages/caliper-core/lib/common/config/Config.js b/packages/caliper-core/lib/common/config/Config.js index ff8a52f30..f4dd111d5 100644 --- a/packages/caliper-core/lib/common/config/Config.js +++ b/packages/caliper-core/lib/common/config/Config.js @@ -65,6 +65,11 @@ const keys = { }, PrometheusPush: { Interval: 'caliper-observer-prometheuspush-interval' + }, + PrometheusManager: { + Method: 'caliper-observer-prometheusmanager-method', + Interval: 'caliper-observer-prometheusmanager-interval', + CollationCount: 'caliper-observer-prometheusmanager-collationcount' } }, Workspace: 'caliper-workspace', diff --git a/packages/caliper-core/lib/common/config/default.yaml b/packages/caliper-core/lib/common/config/default.yaml index 1bd236ba7..c83f2f3d7 100644 --- a/packages/caliper-core/lib/common/config/default.yaml +++ b/packages/caliper-core/lib/common/config/default.yaml @@ -90,6 +90,13 @@ caliper: prometheus: # Default scrape port for prometheus tx observer scrapeport: 3000 + prometheusmanager: + # Update method + method: periodic + # Default update interval for the periodic update method + interval: 1000 + # Collation count for the collate update method + collationcount: 10 # Configurations related to the logging mechanism logging: # Specifies the message structure through placeholders diff --git a/packages/caliper-core/lib/common/messages/workerMetricsMessage.js b/packages/caliper-core/lib/common/messages/workerMetricsMessage.js new file mode 100644 index 000000000..ca36748dc --- /dev/null +++ b/packages/caliper-core/lib/common/messages/workerMetricsMessage.js @@ -0,0 +1,37 @@ +/* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +'use strict'; + +const Message = require('./message'); +const MessageTypes = require('../utils/constants').Messages.Types; + +/** + * Class for the "workerMetricsMessage" message. + */ +class WorkerMetricsMessage extends Message { + /** + * Constructor for a "workerMetricsMessage" message instance. + * @param {string} sender The sender of the message. + * @param {string[]} recipients The recipients of the message. + * @param {object} content The content of the message. + * @param {string} date The date string of the message. + * @param {string} error The potential error associated with the message. + */ + constructor(sender, recipients, content, date = undefined, error = undefined) { + super(sender, recipients, MessageTypes.WorkerMetricsMessage, content, date, error); + } +} + +module.exports = WorkerMetricsMessage; diff --git a/packages/caliper-core/lib/worker/tx-observers/prometheus-manager-tx-observer.js b/packages/caliper-core/lib/worker/tx-observers/prometheus-manager-tx-observer.js new file mode 100644 index 000000000..fde97e903 --- /dev/null +++ b/packages/caliper-core/lib/worker/tx-observers/prometheus-manager-tx-observer.js @@ -0,0 +1,187 @@ +/* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +'use strict'; + +const TxObserverInterface = require('./tx-observer-interface'); +const WorkerMetricsMessage = require('../../common/messages/workerMetricsMessage'); +const CaliperUtils = require('../../common/utils/caliper-utils'); +const ConfigUtil = require('../../common/config/config-util'); + +const Logger = CaliperUtils.getLogger('prometheus-tx-observer'); + +/** + * Prometheus Manager TX observer used to send updates to the Prometheus scrape server in the manager. + */ +class PrometheusManagerTxObserver extends TxObserverInterface { + /** + * Initializes the instance. + * @param {object} options The observer configuration object. + * @param {MessengerInterface} messenger The worker messenger instance. Not used. + * @param {number} workerIndex The 0-based index of the worker node. + * @param {string} managerUuid The UUID of the manager messenger. + */ + constructor(options, messenger, workerIndex, managerUuid) { + super(messenger, workerIndex); + + this.method = (options && options.method) ? options.method : ConfigUtil.get(ConfigUtil.keys.Observer.PrometheusManager.Method); + + switch (this.method) { + case 'periodic': { + this.updateInterval = (options && options.interval) ? options.interval : ConfigUtil.get(ConfigUtil.keys.Observer.PrometheusManager.Interval); + this.intervalObject = undefined; + if (this.updateInterval <= 0) { + this.updateInterval = ConfigUtil.get(ConfigUtil.keys.Observer.PrometheusManager.Interval); + Logger.warn(`Invalid update interval specified, using default value of ${this.updateInterval}`); + } + if (options && options.collationCount) { + Logger.warn('Collation count is ignored when using periodic method'); + } + break; + } + + case 'collate' : { + this.collationCount = (options && options.collationCount) ? options.collationCount : ConfigUtil.get(ConfigUtil.keys.Observer.PrometheusManager.CollationCount); + if (this.collationCount <= 0) { + this.collationCount = ConfigUtil.get(ConfigUtil.keys.Observer.PrometheusManager.CollationCount); + Logger.warn(`Invalid collation count specified, using default value of ${this.collationCount}`); + } + if (options && options.interval) { + Logger.warn('Update interval is ignored when using collate method'); + } + break; + } + + default: { + const msg = `Unrecognised method '${this.method}' specified for prometheus manager, must be either 'collate' or 'periodic' `; + Logger.error(msg); + throw new Error(msg); + } + + } + + this.pendingMessages = []; + this.managerUuid = managerUuid; + } + /** + * Called when TXs are submitted. + * @param {number} count The number of submitted TXs. Can be greater than one for a batch of TXs. + */ + txSubmitted(count) { + const message = new WorkerMetricsMessage(this.messenger.getUUID(), [this.managerUuid], { + event: 'txSubmitted', + workerIndex: this.workerIndex, + roundIndex: this.currentRound, + roundLabel: this.roundLabel, + count: count + }); + this._appendMessage(message); + } + + /** + * Called when TXs are finished. + * @param {TxStatus | TxStatus[]} results The result information of the finished TXs. Can be a collection of results for a batch of TXs. + */ + txFinished(results) { + if (Array.isArray(results)) { + for (const result of results) { + // pass/fail status from result.GetStatus() + const message = new WorkerMetricsMessage(this.messenger.getUUID(), [this.managerUuid], { + event: 'txFinished', + workerIndex: this.workerIndex, + roundIndex: this.currentRound, + roundLabel: this.roundLabel, + status: result.GetStatus(), + latency: (result.GetTimeFinal() - result.GetTimeCreate()) / 1000 + }); + this._appendMessage(message); + } + } else { + // pass/fail status from result.GetStatus() + const message = new WorkerMetricsMessage(this.messenger.getUUID(), [this.managerUuid], { + event: 'txFinished', + workerIndex: this.workerIndex, + roundIndex: this.currentRound, + roundLabel: this.roundLabel, + status: results.GetStatus(), + latency: (results.GetTimeFinal() - results.GetTimeCreate()) / 1000 + }); + this._appendMessage(message); + } + } + + /** + * Adds message to the pending message queue + * @param {object} message Pending message + * @private + */ + async _appendMessage(message) { + this.pendingMessages.push(message); + + if (this.method === 'collate' && this.pendingMessages.length === this.collationCount) { + this._sendUpdate(); + } + } + + /** + * Sends the current aggregated statistics to the manager node when triggered by "setInterval". + * @private + */ + _sendUpdate() { + for (const message of this.pendingMessages) { + this.messenger.send(message); + } + this.pendingMessages = []; + } + + /** + * Activates the TX observer instance and starts the regular update scheduling. + * @param {number} roundIndex The 0-based index of the current round. + * @param {string} roundLabel The roundLabel name. + */ + async activate(roundIndex, roundLabel) { + await super.activate(roundIndex, roundLabel); + + if (this.method === 'periodic') { + this.intervalObject = setInterval(async () => { this._sendUpdate(); }, this.updateInterval); + } + } + + /** + * Deactivates the TX observer interface, and stops the regular update scheduling. + */ + async deactivate() { + await super.deactivate(); + + if (this.intervalObject) { + clearInterval(this.intervalObject); + this.intervalObject = undefined; + } + await this._sendUpdate(); + } +} + +/** + * Factory function for creating a PrometheusManagerTxObserver instance. + * @param {object} options The observer configuration object. + * @param {MessengerInterface} messenger The worker messenger instance. + * @param {number} workerIndex The 0-based index of the worker node. + * @param {string} managerUuid The UUID of the manager messenger. + * @return {TxObserverInterface} The observer instance. + */ +function createTxObserver(options, messenger, workerIndex, managerUuid) { + return new PrometheusManagerTxObserver(options, messenger, workerIndex, managerUuid); +} + +module.exports.createTxObserver = createTxObserver; diff --git a/packages/caliper-core/test/worker/tx-observers/prometheus-manager-tx-observer.js b/packages/caliper-core/test/worker/tx-observers/prometheus-manager-tx-observer.js new file mode 100644 index 000000000..14efd586c --- /dev/null +++ b/packages/caliper-core/test/worker/tx-observers/prometheus-manager-tx-observer.js @@ -0,0 +1,634 @@ +/* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +'use strict'; + +const chai = require('chai'); +const chaiAsPromised = require('chai-as-promised'); +chai.use(chaiAsPromised); +const mockery = require('mockery'); +const sinon = require('sinon'); +const expect = chai.expect; + +const warnLogger = sinon.stub(); +const errorLogger = sinon.stub(); + +/** + * simulate Util + */ +class Utils { + /** + * + * @param {*} path path + * @return {string} the fake path + */ + static resolvePath(path) { + return 'fake/path'; + } + + /** + * + * @return {boolean} if the process is a forked process + */ + static isForkedProcess() { + return false; + } + + /** + * + * @param {*} yaml res + * @return {string} the fake yaml + */ + static parseYaml(yaml) { + return 'yaml'; + } + + /** + * @returns {*} logger stub + */ + static getLogger() { + return { + warn: warnLogger, + error: errorLogger + }; + } + + /** + * @param {*} url url + * @returns {*} url + */ + static augmentUrlWithBasicAuth(url) { + return url; + } +} + +mockery.enable({ + warnOnReplace: false, + warnOnUnregistered: false +}); +mockery.registerMock('../../common/utils/caliper-utils', Utils); + + +describe('When using a PrometheusManagerTxObserver', () => { + + // Require here to enable mocks to be established + const PrometheusManagerTxObserver = require('../../../lib/worker/tx-observers/prometheus-manager-tx-observer'); + + after(()=> { + mockery.deregisterAll(); + mockery.disable(); + }); + + beforeEach(() => { + warnLogger.reset(); + errorLogger.reset(); + }); + + it('should set managerUuid passed through constructor', () => { + const observer = new PrometheusManagerTxObserver.createTxObserver(undefined, undefined, undefined, 'fakeUuid'); + expect(observer.managerUuid).to.equal('fakeUuid'); + }); + + it('should set the correct parameters when method is periodic', () => { + const options = { + method: 'periodic', + interval: 1000, + }; + const observer = new PrometheusManagerTxObserver.createTxObserver(options, undefined, undefined, 'fakeUuid'); + expect(observer.method).to.equal('periodic'); + expect(observer.updateInterval).to.equal(1000); + expect(observer.intervalObject).to.equal(undefined); + }); + + it('should set the correct parameters when method is collate', () => { + const options = { + method: 'collate', + collationCount: 10, + }; + const observer = new PrometheusManagerTxObserver.createTxObserver(options, undefined, undefined, 'fakeUuid'); + expect(observer.method).to.equal('collate'); + expect(observer.collationCount).to.equal(10); + }); + + it('should set the default method when options are not provided', () => { + const observer = new PrometheusManagerTxObserver.createTxObserver(undefined, undefined, undefined, 'fakeUuid'); + expect(observer.method).to.equal('periodic'); + expect(observer.updateInterval).to.equal(1000); + expect(observer.intervalObject).to.equal(undefined); + }); + + it('should throw an error if an unknown method is specified', () => { + const options = { + method: 'profjgd' + }; + expect(() => { + new PrometheusManagerTxObserver.createTxObserver(options, undefined, undefined, 'fakeUuid'); + }).to.throw(/Unrecognised method 'profjgd' specified for prometheus manager, must be either 'collate' or 'periodic'/); + }); + + it('should use default update interval and print warning when method is periodic and interval is invalid', () => { + const options = { + method: 'periodic', + interval: -1, + }; + const observer = new PrometheusManagerTxObserver.createTxObserver(options, undefined, undefined, 'fakeUuid'); + expect(observer.method).to.equal('periodic'); + expect(observer.updateInterval).to.equal(1000); + expect(observer.intervalObject).to.equal(undefined); + sinon.assert.calledOnce(warnLogger); + }); + + it('should warn when collationCount is specified but method is periodic', () => { + const options = { + method: 'periodic', + collationCount: 10, + }; + const observer = new PrometheusManagerTxObserver.createTxObserver(options, undefined, undefined, 'fakeUuid'); + expect(observer.method).to.equal('periodic'); + sinon.assert.calledOnce(warnLogger); + }); + + it('should use default collationCount and print warning when method is collate and collationCount is invalid', () => { + const options = { + method: 'collate', + collationCount: -1, + }; + const observer = new PrometheusManagerTxObserver.createTxObserver(options, undefined, undefined, 'fakeUuid'); + expect(observer.method).to.equal('collate'); + expect(observer.collationCount).to.equal(10); + sinon.assert.calledOnce(warnLogger); + }); + + it('should warn when interval is specified but method is collate', () => { + const options = { + method: 'collate', + interval: 1000, + }; + const observer = new PrometheusManagerTxObserver.createTxObserver(options, undefined, undefined, 'fakeUuid'); + expect(observer.method).to.equal('collate'); + sinon.assert.calledOnce(warnLogger); + }); + + + it('should update the pending messages array when TXs are submitted', async () => { + const senderUuid = 'senderUuid'; + const messenger = { + getUUID: sinon.stub().returns(senderUuid), + send: sinon.stub() + }; + const workerIndex = 0; + const roundIndex = 1; + const roundLabel = 'roundLabel'; + const managerUuid = 'managerUuid'; + const txCount = 1; + + const observer = new PrometheusManagerTxObserver.createTxObserver(undefined, messenger, workerIndex, managerUuid); + observer.messenger = messenger; + observer.currentRound = roundIndex; + observer.roundLabel = roundLabel; + + await observer.txSubmitted(txCount); + + expect(observer.pendingMessages).to.have.lengthOf(1); + }); + + it('should update the pending messages array when single TX is finished', async () => { + const senderUuid = 'senderUuid'; + const messenger = { + getUUID: sinon.stub().returns(senderUuid), + send: sinon.stub() + }; + const workerIndex = 0; + const roundIndex = 1; + const roundLabel = 'roundLabel'; + const managerUuid = 'managerUuid'; + const timeFinal = 1000; + const timeCreate = 0; + + const result = { + GetStatus: sinon.stub().returns('success'), + GetTimeFinal: sinon.stub().returns(timeFinal), + GetTimeCreate: sinon.stub().returns(timeCreate), + }; + + const observer = new PrometheusManagerTxObserver.createTxObserver(undefined, messenger, workerIndex, managerUuid); + observer.messenger = messenger; + observer.currentRound = roundIndex; + observer.roundLabel = roundLabel; + + await observer.txFinished(result); + + expect(observer.pendingMessages).to.have.lengthOf(1); + }); + + it('should update the pending messages array when multiple TXs are finished', async () => { + const senderUuid = 'senderUuid'; + const messenger = { + getUUID: sinon.stub().returns(senderUuid), + send: sinon.stub() + }; + const workerIndex = 0; + const roundIndex = 1; + const roundLabel = 'roundLabel'; + const managerUuid = 'managerUuid'; + const timeFinal = 1000; + const timeCreate = 0; + + const result = { + GetStatus: sinon.stub().returns('success'), + GetTimeFinal: sinon.stub().returns(timeFinal), + GetTimeCreate: sinon.stub().returns(timeCreate), + }; + + const observer = new PrometheusManagerTxObserver.createTxObserver(undefined, messenger, workerIndex, managerUuid); + observer.messenger = messenger; + observer.currentRound = roundIndex; + observer.roundLabel = roundLabel; + + await observer.txFinished([result, result]); + + expect(observer.pendingMessages).to.have.lengthOf(2); + }); + + it('should trigger update when collationCount is crossed with the collate method', async () => { + const senderUuid = 'senderUuid'; + const messenger = { + getUUID: sinon.stub().returns(senderUuid), + send: sinon.stub() + }; + const workerIndex = 0; + const roundIndex = 1; + const roundLabel = 'roundLabel'; + const managerUuid = 'managerUuid'; + const timeFinal = 1000; + const timeCreate = 0; + + const result = { + GetStatus: sinon.stub().returns('success'), + GetTimeFinal: sinon.stub().returns(timeFinal), + GetTimeCreate: sinon.stub().returns(timeCreate), + }; + + const options = { + method: 'collate', + collationCount: 2, + }; + + const observer = new PrometheusManagerTxObserver.createTxObserver(options, messenger, workerIndex, managerUuid); + observer.messenger = messenger; + observer.currentRound = roundIndex; + observer.roundLabel = roundLabel; + + observer._sendUpdate = sinon.spy(); + + await observer.txFinished([result, result]); + + expect(observer._sendUpdate).to.have.been.calledOnce; + }); + + it('should not trigger update until collation count is reached with method collate', async () => { + const senderUuid = 'senderUuid'; + const messenger = { + getUUID: sinon.stub().returns(senderUuid), + send: sinon.stub() + }; + const workerIndex = 0; + const roundIndex = 1; + const roundLabel = 'roundLabel'; + const managerUuid = 'managerUuid'; + const timeFinal = 1000; + const timeCreate = 0; + + const result = { + GetStatus: sinon.stub().returns('success'), + GetTimeFinal: sinon.stub().returns(timeFinal), + GetTimeCreate: sinon.stub().returns(timeCreate), + }; + + const options = { + method: 'collate', + collationCount: 2, + }; + + const observer = new PrometheusManagerTxObserver.createTxObserver(options, messenger, workerIndex, managerUuid); + observer.messenger = messenger; + observer.currentRound = roundIndex; + observer.roundLabel = roundLabel; + + observer._sendUpdate = sinon.spy(); + + await observer.txFinished(result); + + expect(observer._sendUpdate).to.not.have.been.called; + }); + + it('should send pending messages when collation count is reached with method collate', async () => { + const senderUuid = 'senderUuid'; + const messenger = { + getUUID: sinon.stub().returns(senderUuid), + send: sinon.spy() + }; + const workerIndex = 0; + const roundIndex = 1; + const roundLabel = 'roundLabel'; + const managerUuid = 'managerUuid'; + const timeFinal = 1000; + const timeCreate = 0; + + const result = { + GetStatus: sinon.stub().returns('success'), + GetTimeFinal: sinon.stub().returns(timeFinal), + GetTimeCreate: sinon.stub().returns(timeCreate), + }; + + const options = { + method: 'collate', + collationCount: 2, + }; + + const observer = new PrometheusManagerTxObserver.createTxObserver(options, messenger, workerIndex, managerUuid); + observer.messenger = messenger; + observer.currentRound = roundIndex; + observer.roundLabel = roundLabel; + + await observer.txFinished([result, result]); + + expect(messenger.send).to.have.been.calledTwice; + }); + + it('should clear pending messages when collation count is reached with method collate', async () => { + const senderUuid = 'senderUuid'; + const messenger = { + getUUID: sinon.stub().returns(senderUuid), + send: sinon.stub() + }; + const workerIndex = 0; + const roundIndex = 1; + const roundLabel = 'roundLabel'; + const managerUuid = 'managerUuid'; + const timeFinal = 1000; + const timeCreate = 0; + + const result = { + GetStatus: sinon.stub().returns('success'), + GetTimeFinal: sinon.stub().returns(timeFinal), + GetTimeCreate: sinon.stub().returns(timeCreate), + }; + + const options = { + method: 'collate', + collationCount: 2, + }; + + const observer = new PrometheusManagerTxObserver.createTxObserver(options, messenger, workerIndex, managerUuid); + observer.messenger = messenger; + observer.currentRound = roundIndex; + observer.roundLabel = roundLabel; + + await observer.txFinished([result, result]); + + expect(observer.pendingMessages).to.have.lengthOf(0); + }); + + it('should setup interval timer with method periodic', async () => { + const senderUuid = 'senderUuid'; + const messenger = { + getUUID: sinon.stub().returns(senderUuid), + send: sinon.stub() + }; + const workerIndex = 0; + const roundIndex = 1; + const roundLabel = 'roundLabel'; + const managerUuid = 'managerUuid'; + + const options = { + method: 'periodic', + interval: 1000, + }; + + const observer = new PrometheusManagerTxObserver.createTxObserver(options, messenger, workerIndex, managerUuid); + observer.messenger = messenger; + observer.currentRound = roundIndex; + observer.roundLabel = roundLabel; + + await observer.activate(roundIndex, roundLabel); + + expect(observer.intervalObject).to.not.be.undefined; + }); + + it('should trigger update when interval is exceeded with method periodic', async () => { + const clock = sinon.useFakeTimers(); + + const senderUuid = 'senderUuid'; + const messenger = { + getUUID: sinon.stub().returns(senderUuid), + send: sinon.stub() + }; + const workerIndex = 0; + const roundIndex = 1; + const roundLabel = 'roundLabel'; + const managerUuid = 'managerUuid'; + const timeFinal = 1000; + const timeCreate = 0; + + const result = { + GetStatus: sinon.stub().returns('success'), + GetTimeFinal: sinon.stub().returns(timeFinal), + GetTimeCreate: sinon.stub().returns(timeCreate), + }; + + const options = { + method: 'periodic', + interval: 1000, + }; + + const observer = new PrometheusManagerTxObserver.createTxObserver(options, messenger, workerIndex, managerUuid); + observer.messenger = messenger; + observer.currentRound = roundIndex; + observer.roundLabel = roundLabel; + + observer._sendUpdate = sinon.spy(); + + await observer.activate(roundIndex, roundLabel); + await observer.txFinished(result); + + expect(observer._sendUpdate).to.not.have.been.called; + + clock.tick(1000); + + expect(observer._sendUpdate).to.have.been.calledOnce; + + clock.restore(); + }); + + it('should send pending messages when interval is exceeded with method periodic', async () => { + const clock = sinon.useFakeTimers(); + + const senderUuid = 'senderUuid'; + const messenger = { + getUUID: sinon.stub().returns(senderUuid), + send: sinon.spy() + }; + const workerIndex = 0; + const roundIndex = 1; + const roundLabel = 'roundLabel'; + const managerUuid = 'managerUuid'; + const timeFinal = 1000; + const timeCreate = 0; + + const result = { + GetStatus: sinon.stub().returns('success'), + GetTimeFinal: sinon.stub().returns(timeFinal), + GetTimeCreate: sinon.stub().returns(timeCreate), + }; + + const options = { + method: 'periodic', + interval: 1000, + }; + + const observer = new PrometheusManagerTxObserver.createTxObserver(options, messenger, workerIndex, managerUuid); + observer.messenger = messenger; + observer.currentRound = roundIndex; + observer.roundLabel = roundLabel; + + await observer.activate(roundIndex, roundLabel); + await observer.txFinished(result); + + expect(messenger.send).to.not.have.been.called; + + clock.tick(1000); + + expect(messenger.send).to.have.been.calledOnce; + + clock.restore(); + }); + + it('should clear pending messages when interval is exceeded with method periodic', async () => { + const clock = sinon.useFakeTimers(); + + const senderUuid = 'senderUuid'; + const messenger = { + getUUID: sinon.stub().returns(senderUuid), + send: sinon.stub() + }; + const workerIndex = 0; + const roundIndex = 1; + const roundLabel = 'roundLabel'; + const managerUuid = 'managerUuid'; + const timeFinal = 1000; + const timeCreate = 0; + + const result = { + GetStatus: sinon.stub().returns('success'), + GetTimeFinal: sinon.stub().returns(timeFinal), + GetTimeCreate: sinon.stub().returns(timeCreate), + }; + + const options = { + method: 'periodic', + interval: 1000, + }; + + const observer = new PrometheusManagerTxObserver.createTxObserver(options, messenger, workerIndex, managerUuid); + observer.messenger = messenger; + observer.currentRound = roundIndex; + observer.roundLabel = roundLabel; + + await observer.activate(roundIndex, roundLabel); + await observer.txFinished(result); + + expect(observer.pendingMessages).to.have.lengthOf(1); + + clock.tick(1000); + + expect(observer.pendingMessages).to.have.lengthOf(0); + + clock.restore(); + }); + + it('should clear interval timer when deactivated with method periodic', async () => { + const clock = sinon.useFakeTimers(); + + const senderUuid = 'senderUuid'; + const messenger = { + getUUID: sinon.stub().returns(senderUuid), + send: sinon.stub() + }; + const workerIndex = 0; + const roundIndex = 1; + const roundLabel = 'roundLabel'; + const managerUuid = 'managerUuid'; + + const options = { + method: 'periodic', + interval: 1000, + }; + + const observer = new PrometheusManagerTxObserver.createTxObserver(options, messenger, workerIndex, managerUuid); + observer.messenger = messenger; + observer.currentRound = roundIndex; + observer.roundLabel = roundLabel; + + await observer.activate(roundIndex, roundLabel); + await observer.deactivate(); + + expect(observer.intervalObject).to.be.undefined; + + clock.restore(); + }); + + it('should send pending messages when deactivated', async () => { + const clock = sinon.useFakeTimers(); + + const senderUuid = 'senderUuid'; + const messenger = { + getUUID: sinon.stub().returns(senderUuid), + send: sinon.spy() + }; + const workerIndex = 0; + const roundIndex = 1; + const roundLabel = 'roundLabel'; + const managerUuid = 'managerUuid'; + const timeFinal = 1000; + const timeCreate = 0; + + const result = { + GetStatus: sinon.stub().returns('success'), + GetTimeFinal: sinon.stub().returns(timeFinal), + GetTimeCreate: sinon.stub().returns(timeCreate), + }; + + const options = { + method: 'periodic', + interval: 1000, + }; + + const observer = new PrometheusManagerTxObserver.createTxObserver(options, messenger, workerIndex, managerUuid); + observer.messenger = messenger; + observer.currentRound = roundIndex; + observer.roundLabel = roundLabel; + + await observer.activate(roundIndex, roundLabel); + await observer.txFinished(result); + + expect(messenger.send).to.not.have.been.called; + + await observer.deactivate(); + + expect(messenger.send).to.have.been.calledOnce; + + clock.restore(); + }); +});