diff --git a/packages/caliper-core/lib/worker/rate-control/maxRate.js b/packages/caliper-core/lib/worker/rate-control/maxRate.js new file mode 100644 index 0000000000..28ade55cff --- /dev/null +++ b/packages/caliper-core/lib/worker/rate-control/maxRate.js @@ -0,0 +1,203 @@ +/* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +'use strict'; + +const RateInterface = require('./rateInterface.js'); +const Sleep = require('../../common/utils/caliper-utils').sleep; +const Logger = require('../../common/utils/caliper-utils').getLogger('maxRate.js'); + +/** + * Rate controller for driving at a maximum TPS. + */ +class MaxRate extends RateInterface { + /** + * Creates a new instance of the MaxRate class. + * @constructor + * @param {object} opts Options for the rate controller. + */ + constructor(opts) { + super(opts); + + // Map for TPS observations + this.observedTPS = new Map(); + // Map for TPS settings + this.tpsSettings = new Map(); + // MPS for observed stats + this.statisticsMap = new Map(); + } + + /** + * Initialise the rate controller with a passed msg object + * @param {object} msg Client options with adjusted per-client load settings. + * @param {string} msg.type The type of the message. Currently always 'test' + * @param {string} msg.label The label of the round. + * @param {object} msg.rateControl The rate control to use for the round. + * @param {number} msg.trim The number/seconds of transactions to trim from the results. + * @param {object} msg.args The user supplied arguments for the round. + * @param {string} msg.cb The path of the user's callback module. + * @param {string} msg.config The path of the network's configuration file. + * @param {number} msg.numb The number of transactions to generate during the round. + * @param {number} msg.txDuration The length of the round in SECONDS. + * @param {number} msg.totalClients The number of clients executing the round. + * @param {number} msg.clients The number of clients executing the round. + * @param {object} msg.clientArgs Arguments for the client. + * @param {number} msg.clientIdx The 0-based index of the current client. + * @param {number} msg.roundIdx The 1-based index of the current round. + * + * @async + */ + async init(msg) { + this.sleepTime = 100; + this.reset = false; + + // Client TPS + const startTps = this.options.tps ? this.options.tps : 5; + const startTpsPerClient = msg.totalClients ? (startTps / msg.totalClients) : startTps; + // - Store these + this.tpsSettings.set('previous', startTpsPerClient); + this.tpsSettings.set('current', startTpsPerClient); + this.tpsSettings.set('max', startTpsPerClient); + + // Client TPS Step + const tpsStep = this.options.step ? this.options.step : 5; + this.step = msg.totalClients ? (tpsStep / msg.totalClients) : tpsStep; + + // Minimum sample interval (default 10s) + this.sampleInterval = this.options.sampleInterval ? this.options.sampleInterval : 10; + } + + /** + * Perform the rate control action based on knowledge of the start time, current index, and current results.Sleep a suitable time + * @param {number} start, generation time of the first test transaction (unused) + * @param {number} idx, sequence number of the current test transaction + * @param {Array} currentResults, current result set not yet reset by txUpdate() callback + * @param {Array} resultStats, result status set formed in txUpdate() callback + * @async + */ + async applyRateControl(start, idx, currentResults, resultStats) { + + // Waiting until successful transactions occur. + if (resultStats.length < 2 || !resultStats[0].succ || !resultStats[0].create || !resultStats[0].final) { + await this.applySleepInterval(); + return; + } else { + // txUpdate intervals are the only places we can detect changes. This is refreshed, and at that point + // minCreate will increase as we will be dealing with more recent submissions + + // First entry + if (!this.statisticsMap.has('current')) { + this.statisticsMap.set('previous', resultStats[1]); + this.statisticsMap.set('current', resultStats[1]); + this.statisticsMap.set('sampleStart', resultStats[1].create.min); + + const achievedTPS = this.retrieveIntervalTPS(resultStats); + this.observedTPS.set('current', achievedTPS); + this.observedTPS.set('max', achievedTPS); + } + + // Only modify when result stats has been updated + if (this.updateOccurred(resultStats)) { + + // Have we waited the required sample interval? + if (this.exceededSampleInterval(resultStats)) { + this.statisticsMap.set('current', resultStats[1]); + this.statisticsMap.set('sampleStart', resultStats[1].final.last); + const achievedTPS = this.retrieveIntervalTPS(resultStats); + + // New TPS results + this.observedTPS.set('previous', this.observedTPS.get('current')); + this.observedTPS.set('current', achievedTPS); + + Logger.debug(`Observed current worker TPS ${this.observedTPS.get('current')}`); + Logger.debug(`Observed previous worker TPS ${this.observedTPS.get('previous')}`); + + // Action based on transaction rate trajectory (+/-) + const dTxn = this.observedTPS.get('current') - this.observedTPS.get('previous'); + this.tpsSettings.set('previous', this.tpsSettings.get('current')); + if (dTxn > 0) { + // Keep ramping, try for the new max! + this.tpsSettings.set('current', this.tpsSettings.get('current') + this.step); + Logger.debug(`Increased worker TPS to ${this.tpsSettings.get('current')}`); + } else { + // Too far, back off and try smaller step size. Need to ensure we drain the backlog too. + this.reset = true; + this.tpsSettings.set('current', this.tpsSettings.get('current') - this.step); + this.step = this.step > 0.2 ? this.step / 2 : this.step; + Logger.debug(`Decreased worker TPS to ${this.tpsSettings.get('current')} and step size to ${this.step}`); + } + } + } + } + + // Continue at fixed TPS within this update interval + await this.applySleepInterval(); + } + + /** + * Check if a txUpdate has occurred + * @param {object} resultStats the result statistics + * @returns {boolean} update boolean + */ + updateOccurred(resultStats) { + return this.statisticsMap.get('current').create.min !== resultStats[1].create.min; + } + + /** + * Check if required sample time has been reached + * @param {object} resultStats the result statistics + * @returns {boolean} boolean flag + */ + exceededSampleInterval(resultStats) { + return resultStats[1].final.last - this.statisticsMap.get('sampleStart') >= this.sampleInterval; + } + + /** + * TPS from the previous txUpdate interval statistics + * @param {object} resultStats the passed stats object + * @return {number} the TPS within the interval + */ + retrieveIntervalTPS(resultStats) { + const resultStatistics = resultStats[1]; + return (resultStatistics.succ + resultStatistics.fail) / (resultStatistics.final.last - resultStatistics.create.min); + } + + /** + * Apply the client TPS + */ + async applySleepInterval() { + const sleepTime = 1000 / this.tpsSettings.get('current'); + await Sleep(sleepTime); + } + + /** + * Notify the rate controller about the end of the round. + * @async + */ + async end() { } +} + + +/** + * Creates a new rate controller instance. + * @param {object} opts The rate controller options. + * @param {number} clientIdx The 0-based index of the client who instantiates the controller. + * @param {number} roundIdx The 1-based index of the round the controller is instantiated in. + * @return {RateInterface} The rate controller instance. + */ +function createRateController(opts, clientIdx, roundIdx) { + return new MaxRate(opts); +} + +module.exports.createRateController = createRateController; diff --git a/packages/caliper-core/lib/worker/rate-control/rateControl.js b/packages/caliper-core/lib/worker/rate-control/rateControl.js index 58c23e0f95..24d0064e77 100644 --- a/packages/caliper-core/lib/worker/rate-control/rateControl.js +++ b/packages/caliper-core/lib/worker/rate-control/rateControl.js @@ -24,7 +24,8 @@ const builtInControllers = new Map([ ['record-rate', './recordRate.js'], ['replay-rate', './replayRate.js'], ['linear-rate', './linearRate.js'], - ['fixed-feedback-rate', './fixedFeedbackRate.js'] + ['fixed-feedback-rate', './fixedFeedbackRate.js'], + ['maximum-rate', './maxRate.js'] ]); const RateControl = class { diff --git a/packages/caliper-core/test/worker/rate-control/fixedBacklog.js b/packages/caliper-core/test/worker/rate-control/fixedBacklog.js index 645378db83..d1cafc1ff3 100644 --- a/packages/caliper-core/test/worker/rate-control/fixedBacklog.js +++ b/packages/caliper-core/test/worker/rate-control/fixedBacklog.js @@ -79,7 +79,7 @@ describe('fixedBacklog controller implementation', () => { }); - describe('#applyRateControl', async () => { + describe('#applyRateControl', () => { let sleepStub; let opts = { @@ -96,25 +96,25 @@ describe('fixedBacklog controller implementation', () => { controller.unfinished_per_client = 30; }); - it('should sleep if resultStats.length < 2', () => { - controller.applyRateControl(null, 1, [], []); + it('should sleep if resultStats.length < 2', async () => { + await controller.applyRateControl(null, 1, [], []); sinon.assert.calledOnce(sleepStub); sinon.assert.calledWith(sleepStub, 1000); }); - it ('should sleep if no successful results are available', () => { - controller.applyRateControl(null, 1, [], [{}]); + it ('should sleep if no successful results are available', async () => { + await controller.applyRateControl(null, 1, [], [{}]); sinon.assert.calledOnce(sleepStub); sinon.assert.calledWith(sleepStub, 1000); }); - it ('should sleep if no delay results are available', () => { - controller.applyRateControl(null, 1, [], [{}]); + it ('should sleep if no delay results are available', async () => { + await controller.applyRateControl(null, 1, [], [{}]); sinon.assert.calledOnce(sleepStub); sinon.assert.calledWith(sleepStub, 1000); }); - it ('should not sleep if backlog transaction is below target', () => { + it ('should not sleep if backlog transaction is below target', async () => { let idx = 50; let currentResults = []; let item = { @@ -128,11 +128,11 @@ describe('fixedBacklog controller implementation', () => { resultStats.push(item); resultStats.push(item); - controller.applyRateControl(null, idx, currentResults, resultStats); + await controller.applyRateControl(null, idx, currentResults, resultStats); sinon.assert.notCalled(sleepStub); }); - it ('should sleep if backlog transaction is at or above target', () => { + it ('should sleep if backlog transaction is at or above target', async () => { let idx = 50; let currentResults = []; let item = { @@ -146,12 +146,12 @@ describe('fixedBacklog controller implementation', () => { resultStats.push(item); resultStats.push(item); - controller.applyRateControl(null, idx, currentResults, resultStats); + await controller.applyRateControl(null, idx, currentResults, resultStats); sinon.assert.calledOnce(sleepStub); }); - it ('should sleep for a count of the load error and the current average delay', () => { + it ('should sleep for a count of the load error and the current average delay', async () => { let idx = 50; let currentResults = []; let item = { @@ -172,7 +172,7 @@ describe('fixedBacklog controller implementation', () => { resultStats.push(item); resultStats.push(item); - controller.applyRateControl(null, idx, currentResults, resultStats); + await controller.applyRateControl(null, idx, currentResults, resultStats); const completeTransactions = resultStats[0].length - currentResults.length; const unfinished = idx - completeTransactions; @@ -184,7 +184,7 @@ describe('fixedBacklog controller implementation', () => { sinon.assert.calledWith(sleepStub, backlogDifference*(1000/determinedTPS)); }); - it('should log the backlog error as a debug message', () => { + it('should log the backlog error as a debug message', async () => { const FakeLogger = { debug : () => {}, @@ -207,7 +207,7 @@ describe('fixedBacklog controller implementation', () => { resultStats.push(item); resultStats.push(item); - controller.applyRateControl(null, idx, currentResults, resultStats); + await controller.applyRateControl(null, idx, currentResults, resultStats); const completeTransactions = resultStats[0].length - currentResults.length; const unfinshed = idx - completeTransactions; diff --git a/packages/caliper-core/test/worker/rate-control/maxRate.js b/packages/caliper-core/test/worker/rate-control/maxRate.js new file mode 100644 index 0000000000..8ec42afc51 --- /dev/null +++ b/packages/caliper-core/test/worker/rate-control/maxRate.js @@ -0,0 +1,409 @@ +/* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +'use strict'; + +const rewire = require('rewire'); +const MaxRate = rewire('../../../lib/worker/rate-control/maxRate'); + +const chai = require('chai'); +chai.should(); +const sinon = require('sinon'); + +describe('maxRate controller implementation', () => { + + let sandbox; + + beforeEach(() => { + sandbox = sinon.createSandbox(); + }); + + afterEach( () => { + sandbox.restore(); + }); + + describe('#init', () => { + it('should set a default starting TPS for single or multiple workers', () => { + + let opts = {}; + let msg = {}; + let controller = new MaxRate.createRateController(opts); + controller.init(msg); + controller.tpsSettings.get('current').should.equal(5); + + msg.totalClients = 2; + controller.init(msg); + controller.tpsSettings.get('current').should.equal(2.5); + }); + + it('should set a starting TPS for single or multiple workers', () => { + let opts = { + tps: 10 + }; + let msg = {}; + let controller = new MaxRate.createRateController(opts); + controller.init(msg); + controller.tpsSettings.get('current').should.equal(10); + + msg.totalClients = 2; + controller.init(msg); + controller.tpsSettings.get('current').should.equal(5); + }); + + it('should set a default step size for single or multiple workers', () => { + let opts = {}; + let msg = {}; + let controller = new MaxRate.createRateController(opts); + controller.init(msg); + controller.step.should.equal(5); + + msg.totalClients = 2; + controller.init(msg); + controller.step.should.equal(2.5); + }); + + it('should set a specified step size for single or multiple workers', () => { + let opts = { + step: 10 + }; + let msg = {}; + let controller = new MaxRate.createRateController(opts); + controller.init(msg); + controller.step.should.equal(10); + + msg.totalClients = 2; + controller.init(msg); + controller.step.should.equal(5); + }); + + it('should set a default sample interval for single or multiple workers', () => { + let opts = {}; + let msg = {}; + let controller = new MaxRate.createRateController(opts); + controller.init(msg); + controller.sampleInterval.should.equal(10); + + msg.totalClients = 2; + controller.init(msg); + controller.sampleInterval.should.equal(10); + }); + + it('should set a sample interval if specified for single or multiple workers', () => { + let opts = { + sampleInterval: 20 + }; + let msg = {}; + let controller = new MaxRate.createRateController(opts); + controller.init(msg); + controller.sampleInterval.should.equal(20); + + msg.totalClients = 2; + controller.init(msg); + controller.sampleInterval.should.equal(20); + }); + + }); + + describe('#applyRateControl', async () => { + + let sleepStub; + let controller; + let opts = {}; + + beforeEach(() => { + controller = new MaxRate.createRateController(opts); + sleepStub = sandbox.stub(controller, 'applySleepInterval'); + }); + + it('should sleep if resultStats.length < 2',async () => { + let updateSpy = sandbox.spy(controller, 'updateOccurred'); + await controller.applyRateControl(null, 1, [], [{}]); + + sinon.assert.notCalled(updateSpy); + sinon.assert.calledOnce(sleepStub); + }); + + it('should sleep if no successful results are available', async () => { + let updateSpy = sandbox.spy(controller, 'updateOccurred'); + await controller.applyRateControl(null, 1, [], [{}, {}]); + + sinon.assert.notCalled(updateSpy); + sinon.assert.calledOnce(sleepStub); + }); + + it('should sleep if no successful results are available', async () => { + let updateSpy = sandbox.spy(controller, 'updateOccurred'); + await controller.applyRateControl(null, 1, [], [{ucc: 12}, {}]); + + sinon.assert.notCalled(updateSpy); + sinon.assert.calledOnce(sleepStub); + }); + + it('should initialize internal stats and tps maps on first pass', async () => { + let idx = 50; + let currentResults = []; + let item = { + succ: 5, + create: { + min: 100 + }, + final: { + last: 200 + } + }; + const resultStats = []; + resultStats.push(item); + resultStats.push(item); + + let exceededSampleIntervalSpy = sandbox.spy(controller, 'exceededSampleInterval'); + sandbox.stub(controller, 'updateOccurred').returns(false); + sandbox.stub(controller, 'retrieveIntervalTPS').returns(123); + + controller.init({}); + await controller.applyRateControl(null, idx, currentResults, resultStats); + + // should have internal values + controller.statisticsMap.get('previous').should.deep.equal(item); + controller.statisticsMap.get('current').should.deep.equal(item); + controller.statisticsMap.get('sampleStart').should.equal(100); + + controller.observedTPS.get('max').should.equal(123); + controller.observedTPS.get('current').should.equal(123); + + // Should not have processed update + sinon.assert.notCalled(exceededSampleIntervalSpy); + }); + + it('should ramp the driven TPS if current TPS > previous TPS', async () => { + let idx = 50; + let currentResults = []; + let item = { + succ: 5, + create: { + min: 100 + }, + final: { + last: 200 + } + }; + const resultStats = []; + resultStats.push(item); + resultStats.push(item); + + sandbox.stub(controller, 'updateOccurred').returns(true); + sandbox.stub(controller, 'exceededSampleInterval').returns(true); + sandbox.stub(controller, 'retrieveIntervalTPS').returns(10); + + controller.init({}); + controller.statisticsMap.set('current', {}); + controller.observedTPS.set('current', 5); + + await controller.applyRateControl(null, idx, currentResults, resultStats); + + controller.tpsSettings.get('current').should.equal(10); + }); + + it('should drop the driven TPS and halve the step size if current TPS < previous TPS', async () => { + let idx = 50; + let currentResults = []; + let item = { + succ: 5, + create: { + min: 100 + }, + final: { + last: 200 + } + }; + const resultStats = []; + resultStats.push(item); + resultStats.push(item); + + sandbox.stub(controller, 'updateOccurred').returns(true); + sandbox.stub(controller, 'exceededSampleInterval').returns(true); + sandbox.stub(controller, 'retrieveIntervalTPS').returns(10); + + controller.init({}); + controller.statisticsMap.set('current', {}); + controller.observedTPS.set('current', 11); + controller.step = 5; + controller.tpsSettings.set('current',20); + await controller.applyRateControl(null, idx, currentResults, resultStats); + + controller.tpsSettings.get('current').should.equal(15); + controller.step.should.equal(2.5); + }); + + it('should drop the driven TPS only if current TPS < previous TPS and the step is below a threshold', async () => { + let idx = 50; + let currentResults = []; + let item = { + succ: 5, + create: { + min: 100 + }, + final: { + last: 200 + } + }; + const resultStats = []; + resultStats.push(item); + resultStats.push(item); + + sandbox.stub(controller, 'updateOccurred').returns(true); + sandbox.stub(controller, 'exceededSampleInterval').returns(true); + sandbox.stub(controller, 'retrieveIntervalTPS').returns(10); + + controller.init({}); + controller.statisticsMap.set('current', {}); + controller.observedTPS.set('current', 11); + controller.step = 0.1; + controller.tpsSettings.set('current',20); + await controller.applyRateControl(null, idx, currentResults, resultStats); + + controller.tpsSettings.get('current').should.equal(19.9); + controller.step.should.equal(0.1); + }); + + }); + + describe('#updateOccurred', () => { + + let item = { + succ: 5, + create: { + min: 100 + }, + final: { + last: 200 + } + }; + const resultStats = []; + resultStats.push(item); + resultStats.push(item); + + it('should return true if the stored stats "create.min" differs from the passed', () => { + + let opts = {}; + let msg = {}; + let controller = new MaxRate.createRateController(opts); + controller.init(msg); + controller.statisticsMap.set('current', {create: { + min: 123 + }}); + + controller.updateOccurred(resultStats).should.equal(true); + }); + + it('should return false if the stored stats "create.min" is the same as the passed', () => { + + let opts = {}; + let msg = {}; + let controller = new MaxRate.createRateController(opts); + controller.init(msg); + controller.statisticsMap.set('current', {create: { + min: 100 + }}); + controller.updateOccurred(resultStats).should.equal(false); + }); + + }); + + describe('#exceededSampleInterval', () => { + + let item = { + succ: 5, + create: { + min: 100 + }, + final: { + last: 2000 + } + }; + const resultStats = []; + resultStats.push(item); + resultStats.push(item); + + it('should return true if the sample time is less than the elapsed time', () => { + + let opts = {}; + let msg = {}; + let controller = new MaxRate.createRateController(opts); + controller.init(msg); + controller.statisticsMap.set('sampleStart', 0); + + controller.exceededSampleInterval(resultStats).should.equal(true); + }); + + it('should return false if the sample time is greater than the elapsed time', () => { + + let opts = {}; + let msg = {}; + let controller = new MaxRate.createRateController(opts); + controller.init(msg); + controller.statisticsMap.set('sampleStart', 1999); + controller.exceededSampleInterval(resultStats).should.equal(false); + }); + + }); + + describe('#retrieveIntervalTPS', () => { + + let item = { + succ: 50, + fail: 50, + create: { + min: 10 + }, + final: { + last: 20 + } + }; + const resultStats = []; + resultStats.push(item); + resultStats.push(item); + + it('should return the TPS from the interval', () => { + + let opts = {}; + let msg = {}; + let controller = new MaxRate.createRateController(opts); + controller.init(msg); + controller.statisticsMap.set('sampleStart', 0); + + controller.retrieveIntervalTPS(resultStats).should.equal(10); + }); + + }); + + describe('#applySleepInterval', () => { + + it('should apply the global TPS setting as a sleep interval', () => { + + let opts = {}; + let msg = {}; + let sleepStub = sinon.stub(); + MaxRate.__set__('Sleep', sleepStub); + let controller = new MaxRate.createRateController(opts); + controller.init(msg); + controller.statisticsMap.set('sampleStart', 0); + + controller.applySleepInterval(); + // 200 = 1000/default + sinon.assert.calledOnceWithExactly(sleepStub, 200); + }); + + }); + +});