Skip to content

Commit

Permalink
Add maximum-rate controller
Browse files Browse the repository at this point in the history
Signed-off-by: nkl199@yahoo.co.uk <nkl199@yahoo.co.uk>
  • Loading branch information
nklincoln committed Jun 23, 2020
1 parent 3b471b6 commit 4b23c7c
Show file tree
Hide file tree
Showing 4 changed files with 629 additions and 16 deletions.
203 changes: 203 additions & 0 deletions packages/caliper-core/lib/worker/rate-control/maxRate.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

'use strict';

const RateInterface = require('./rateInterface.js');
const Sleep = require('../../common/utils/caliper-utils').sleep;
const Logger = require('../../common/utils/caliper-utils').getLogger('maxRate.js');

/**
* Rate controller for driving at a maximum TPS.
*/
class MaxRate extends RateInterface {
/**
* Creates a new instance of the MaxRate class.
* @constructor
* @param {object} opts Options for the rate controller.
*/
constructor(opts) {
super(opts);

// Map for TPS observations
this.observedTPS = new Map();
// Map for TPS settings
this.tpsSettings = new Map();
// MPS for observed stats
this.statisticsMap = new Map();
}

/**
* Initialise the rate controller with a passed msg object
* @param {object} msg Client options with adjusted per-client load settings.
* @param {string} msg.type The type of the message. Currently always 'test'
* @param {string} msg.label The label of the round.
* @param {object} msg.rateControl The rate control to use for the round.
* @param {number} msg.trim The number/seconds of transactions to trim from the results.
* @param {object} msg.args The user supplied arguments for the round.
* @param {string} msg.cb The path of the user's callback module.
* @param {string} msg.config The path of the network's configuration file.
* @param {number} msg.numb The number of transactions to generate during the round.
* @param {number} msg.txDuration The length of the round in SECONDS.
* @param {number} msg.totalClients The number of clients executing the round.
* @param {number} msg.clients The number of clients executing the round.
* @param {object} msg.clientArgs Arguments for the client.
* @param {number} msg.clientIdx The 0-based index of the current client.
* @param {number} msg.roundIdx The 1-based index of the current round.
*
* @async
*/
async init(msg) {
this.sleepTime = 100;
this.reset = false;

// Client TPS
const startTps = this.options.tps ? this.options.tps : 5;
const startTpsPerClient = msg.totalClients ? (startTps / msg.totalClients) : startTps;
// - Store these
this.tpsSettings.set('previous', startTpsPerClient);
this.tpsSettings.set('current', startTpsPerClient);
this.tpsSettings.set('max', startTpsPerClient);

// Client TPS Step
const tpsStep = this.options.step ? this.options.step : 5;
this.step = msg.totalClients ? (tpsStep / msg.totalClients) : tpsStep;

// Minimum sample interval (default 10s)
this.sampleInterval = this.options.sampleInterval ? this.options.sampleInterval : 10;
}

/**
* Perform the rate control action based on knowledge of the start time, current index, and current results.Sleep a suitable time
* @param {number} start, generation time of the first test transaction (unused)
* @param {number} idx, sequence number of the current test transaction
* @param {Array} currentResults, current result set not yet reset by txUpdate() callback
* @param {Array} resultStats, result status set formed in txUpdate() callback
* @async
*/
async applyRateControl(start, idx, currentResults, resultStats) {

// Waiting until successful transactions occur.
if (resultStats.length < 2 || !resultStats[0].succ || !resultStats[0].create || !resultStats[0].final) {
await this.applySleepInterval();
return;
} else {
// txUpdate intervals are the only places we can detect changes. This is refreshed, and at that point
// minCreate will increase as we will be dealing with more recent submissions

// First entry
if (!this.statisticsMap.has('current')) {
this.statisticsMap.set('previous', resultStats[1]);
this.statisticsMap.set('current', resultStats[1]);
this.statisticsMap.set('sampleStart', resultStats[1].create.min);

const achievedTPS = this.retrieveIntervalTPS(resultStats);
this.observedTPS.set('current', achievedTPS);
this.observedTPS.set('max', achievedTPS);
}

// Only modify when result stats has been updated
if (this.updateOccurred(resultStats)) {

// Have we waited the required sample interval?
if (this.exceededSampleInterval(resultStats)) {
this.statisticsMap.set('current', resultStats[1]);
this.statisticsMap.set('sampleStart', resultStats[1].final.last);
const achievedTPS = this.retrieveIntervalTPS(resultStats);

// New TPS results
this.observedTPS.set('previous', this.observedTPS.get('current'));
this.observedTPS.set('current', achievedTPS);

Logger.debug(`Observed current worker TPS ${this.observedTPS.get('current')}`);
Logger.debug(`Observed previous worker TPS ${this.observedTPS.get('previous')}`);

// Action based on transaction rate trajectory (+/-)
const dTxn = this.observedTPS.get('current') - this.observedTPS.get('previous');
this.tpsSettings.set('previous', this.tpsSettings.get('current'));
if (dTxn > 0) {
// Keep ramping, try for the new max!
this.tpsSettings.set('current', this.tpsSettings.get('current') + this.step);
Logger.debug(`Increased worker TPS to ${this.tpsSettings.get('current')}`);
} else {
// Too far, back off and try smaller step size. Need to ensure we drain the backlog too.
this.reset = true;
this.tpsSettings.set('current', this.tpsSettings.get('current') - this.step);
this.step = this.step > 0.2 ? this.step / 2 : this.step;
Logger.debug(`Decreased worker TPS to ${this.tpsSettings.get('current')} and step size to ${this.step}`);
}
}
}
}

// Continue at fixed TPS within this update interval
await this.applySleepInterval();
}

/**
* Check if a txUpdate has occurred
* @param {object} resultStats the result statistics
* @returns {boolean} update boolean
*/
updateOccurred(resultStats) {
return this.statisticsMap.get('current').create.min !== resultStats[1].create.min;
}

/**
* Check if required sample time has been reached
* @param {object} resultStats the result statistics
* @returns {boolean} boolean flag
*/
exceededSampleInterval(resultStats) {
return resultStats[1].final.last - this.statisticsMap.get('sampleStart') >= this.sampleInterval;
}

/**
* TPS from the previous txUpdate interval statistics
* @param {object} resultStats the passed stats object
* @return {number} the TPS within the interval
*/
retrieveIntervalTPS(resultStats) {
const resultStatistics = resultStats[1];
return (resultStatistics.succ + resultStatistics.fail) / (resultStatistics.final.last - resultStatistics.create.min);
}

/**
* Apply the client TPS
*/
async applySleepInterval() {
const sleepTime = 1000 / this.tpsSettings.get('current');
await Sleep(sleepTime);
}

/**
* Notify the rate controller about the end of the round.
* @async
*/
async end() { }
}


/**
* Creates a new rate controller instance.
* @param {object} opts The rate controller options.
* @param {number} clientIdx The 0-based index of the client who instantiates the controller.
* @param {number} roundIdx The 1-based index of the round the controller is instantiated in.
* @return {RateInterface} The rate controller instance.
*/
function createRateController(opts, clientIdx, roundIdx) {
return new MaxRate(opts);
}

module.exports.createRateController = createRateController;
3 changes: 2 additions & 1 deletion packages/caliper-core/lib/worker/rate-control/rateControl.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ const builtInControllers = new Map([
['record-rate', './recordRate.js'],
['replay-rate', './replayRate.js'],
['linear-rate', './linearRate.js'],
['fixed-feedback-rate', './fixedFeedbackRate.js']
['fixed-feedback-rate', './fixedFeedbackRate.js'],
['maximum-rate', './maxRate.js']
]);

const RateControl = class {
Expand Down
30 changes: 15 additions & 15 deletions packages/caliper-core/test/worker/rate-control/fixedBacklog.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ describe('fixedBacklog controller implementation', () => {

});

describe('#applyRateControl', async () => {
describe('#applyRateControl', () => {

let sleepStub;
let opts = {
Expand All @@ -96,25 +96,25 @@ describe('fixedBacklog controller implementation', () => {
controller.unfinished_per_client = 30;
});

it('should sleep if resultStats.length < 2', () => {
controller.applyRateControl(null, 1, [], []);
it('should sleep if resultStats.length < 2', async () => {
await controller.applyRateControl(null, 1, [], []);
sinon.assert.calledOnce(sleepStub);
sinon.assert.calledWith(sleepStub, 1000);
});

it ('should sleep if no successful results are available', () => {
controller.applyRateControl(null, 1, [], [{}]);
it ('should sleep if no successful results are available', async () => {
await controller.applyRateControl(null, 1, [], [{}]);
sinon.assert.calledOnce(sleepStub);
sinon.assert.calledWith(sleepStub, 1000);
});

it ('should sleep if no delay results are available', () => {
controller.applyRateControl(null, 1, [], [{}]);
it ('should sleep if no delay results are available', async () => {
await controller.applyRateControl(null, 1, [], [{}]);
sinon.assert.calledOnce(sleepStub);
sinon.assert.calledWith(sleepStub, 1000);
});

it ('should not sleep if backlog transaction is below target', () => {
it ('should not sleep if backlog transaction is below target', async () => {
let idx = 50;
let currentResults = [];
let item = {
Expand All @@ -128,11 +128,11 @@ describe('fixedBacklog controller implementation', () => {
resultStats.push(item);
resultStats.push(item);

controller.applyRateControl(null, idx, currentResults, resultStats);
await controller.applyRateControl(null, idx, currentResults, resultStats);
sinon.assert.notCalled(sleepStub);
});

it ('should sleep if backlog transaction is at or above target', () => {
it ('should sleep if backlog transaction is at or above target', async () => {
let idx = 50;
let currentResults = [];
let item = {
Expand All @@ -146,12 +146,12 @@ describe('fixedBacklog controller implementation', () => {
resultStats.push(item);
resultStats.push(item);

controller.applyRateControl(null, idx, currentResults, resultStats);
await controller.applyRateControl(null, idx, currentResults, resultStats);

sinon.assert.calledOnce(sleepStub);
});

it ('should sleep for a count of the load error and the current average delay', () => {
it ('should sleep for a count of the load error and the current average delay', async () => {
let idx = 50;
let currentResults = [];
let item = {
Expand All @@ -172,7 +172,7 @@ describe('fixedBacklog controller implementation', () => {
resultStats.push(item);
resultStats.push(item);

controller.applyRateControl(null, idx, currentResults, resultStats);
await controller.applyRateControl(null, idx, currentResults, resultStats);

const completeTransactions = resultStats[0].length - currentResults.length;
const unfinished = idx - completeTransactions;
Expand All @@ -184,7 +184,7 @@ describe('fixedBacklog controller implementation', () => {
sinon.assert.calledWith(sleepStub, backlogDifference*(1000/determinedTPS));
});

it('should log the backlog error as a debug message', () => {
it('should log the backlog error as a debug message', async () => {

const FakeLogger = {
debug : () => {},
Expand All @@ -207,7 +207,7 @@ describe('fixedBacklog controller implementation', () => {
resultStats.push(item);
resultStats.push(item);

controller.applyRateControl(null, idx, currentResults, resultStats);
await controller.applyRateControl(null, idx, currentResults, resultStats);

const completeTransactions = resultStats[0].length - currentResults.length;
const unfinshed = idx - completeTransactions;
Expand Down
Loading

0 comments on commit 4b23c7c

Please sign in to comment.