Skip to content

Commit

Permalink
add prometheus scrape target tx observer
Browse files Browse the repository at this point in the history
Signed-off-by: nkl199@yahoo.co.uk <nkl199@yahoo.co.uk>
  • Loading branch information
nklincoln committed Aug 27, 2020
1 parent c529ade commit 80a778c
Show file tree
Hide file tree
Showing 10 changed files with 234 additions and 37 deletions.
3 changes: 2 additions & 1 deletion packages/caliper-core/lib/common/config/Config.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ const keys = {
}
},
Monitor: {
DefaultInterval: 'caliper-monitor-default-interval'
DefaultInterval: 'caliper-monitor-default-interval',
PrometheusScrapePort: 'caliper-monitor-prometheus-scrape-port'
},
Workspace: 'caliper-workspace',
ProjectConfig: 'caliper-projectconfig',
Expand Down
2 changes: 2 additions & 0 deletions packages/caliper-core/lib/common/config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ caliper:
monitor:
# Default update interval
defaultinterval: 5000
# Default scrape port for prometheus tx observer
prometheusscrapeport: 3000
# Configurations related to the logging mechanism
logging:
# Specifies the message structure through placeholders
Expand Down
6 changes: 3 additions & 3 deletions packages/caliper-core/lib/worker/caliper-worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ class CaliperWorker {
this.workerIndex = workerIndex;
this.messenger = messenger;

this.internalTxObserver = new InternalTxObserver(messenger, managerUuid);
this.txObserverDispatch = new TxObserverDispatch(messenger, this.internalTxObserver, managerUuid);
this.internalTxObserver = new InternalTxObserver(messenger, managerUuid, workerIndex);
this.txObserverDispatch = new TxObserverDispatch(messenger, this.internalTxObserver, managerUuid, workerIndex);

// forward adapter notifications to the TX dispatch observer
const self = this;
Expand Down Expand Up @@ -186,7 +186,7 @@ class CaliperWorker {

// Activate dispatcher
Logger.debug(`Worker #${this.workerIndex} activating TX observer dispatch`);
await this.txObserverDispatch.activate(this.workerIndex, roundIndex, roundLabel);
await this.txObserverDispatch.activate(roundIndex, roundLabel);

// Configure
Logger.debug(`Worker #${this.workerIndex} creating rate controller`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ class InternalTxObserver extends TxObserverInterface {
* Initializes the observer instance.
* @param {MessengerInterface} messenger The worker messenger instance.
* @param {string} managerUuid The UUID of the messenger for message sending.
* @param {number} workerIndex The 0-based index of the worker node.
*/
constructor(messenger, managerUuid) {
super(messenger);
constructor(messenger, managerUuid, workerIndex) {
super(messenger, workerIndex);
this.updateInterval = ConfigUtil.get(ConfigUtil.keys.Worker.Update.Interval);
this.intervalObject = undefined;
this.messengerUUID = messenger.getUUID();
Expand All @@ -49,12 +50,11 @@ class InternalTxObserver extends TxObserverInterface {

/**
* Activates the TX observer instance and starts the regular update scheduling.
* @param {number} workerIndex The 0-based index of the worker node.
* @param {number} roundIndex The 0-based index of the current round.
* @param {string} roundLabel The roundLabel name.
*/
async activate(workerIndex, roundIndex, roundLabel) {
await super.activate(workerIndex, roundIndex, roundLabel);
async activate(roundIndex, roundLabel) {
await super.activate(roundIndex, roundLabel);
this.intervalObject = setInterval(async () => { await this._sendUpdate(); }, this.updateInterval);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ class LoggingTxObserver extends TxObserverInterface{
* Initializes the observer instance.
* @param {object} options The log observer configuration object.
* @param {MessengerInterface} messenger The worker messenger instance. Not used.
* @param {number} workerIndex The 0-based index of the worker node.
*/
constructor(options, messenger) {
super(messenger);
let logger = CaliperUtils.getLogger(options.loggerModuleName || 'txinfo');
constructor(options, messenger, workerIndex) {
super(messenger, workerIndex);
let logger = CaliperUtils.getLogger(options.loggerModuleName || 'txInfo');
this.logFunction = logger[options.messageLevel || 'info'];
}

Expand Down Expand Up @@ -70,10 +71,11 @@ class LoggingTxObserver extends TxObserverInterface{
* Factory function for creating a LoggingTxObserver instance.
* @param {object} options The logging observer configuration object.
* @param {MessengerInterface} messenger The worker messenger instance. Not used.
* @param {number} workerIndex The 0-based index of the worker node.
* @return {TxObserverInterface} The observer instance.
*/
function createTxObserver(options, messenger) {
return new LoggingTxObserver(options, messenger);
function createTxObserver(options, messenger, workerIndex) {
return new LoggingTxObserver(options, messenger, workerIndex);
}

module.exports.createTxObserver = createTxObserver;
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
'use strict';

const TxObserverInterface = require('./tx-observer-interface');
// const TxResetMessage = require('../../common/messages/txResetMessage');
const PrometheusClient = require('../../common/prometheus/prometheus-push-client');
const CaliperUtils = require('../../common/utils/caliper-utils');

Expand All @@ -27,9 +26,10 @@ class PrometheusPushTxObserver extends TxObserverInterface {
* Initializes the observer instance.
* @param {object} options The observer configuration object.
* @param {MessengerInterface} messenger The worker messenger instance.
* @param {number} workerIndex The 0-based index of the worker node.
*/
constructor(options, messenger) {
super(messenger);
constructor(options, messenger, workerIndex) {
super(messenger, workerIndex);
this.sendInterval = options && options.sendInterval || 1000;
this.intervalObject = undefined;

Expand Down Expand Up @@ -76,12 +76,11 @@ class PrometheusPushTxObserver extends TxObserverInterface {

/**
* Activates the TX observer instance and starts the regular update scheduling.
* @param {number} workerIndex The 0-based index of the worker node.
* @param {number} roundIndex The 0-based index of the current round.
* @param {string} roundLabel The roundLabel name.
*/
async activate(workerIndex, roundIndex, roundLabel) {
await super.activate(workerIndex, roundIndex, roundLabel);
async activate(roundIndex, roundLabel) {
await super.activate(roundIndex, roundLabel);
this.intervalObject = setInterval(async () => { await this._sendUpdate(); }, this.sendInterval);
}

Expand Down Expand Up @@ -112,10 +111,11 @@ class PrometheusPushTxObserver extends TxObserverInterface {
* Factory function for creating a PrometheusPushTxObserver instance.
* @param {object} options The observer configuration object.
* @param {MessengerInterface} messenger The worker messenger instance.
* @param {number} workerIndex The 0-based index of the worker node.
* @return {TxObserverInterface} The observer instance.
*/
function createTxObserver(options, messenger) {
return new PrometheusPushTxObserver(options, messenger);
function createTxObserver(options, messenger, workerIndex) {
return new PrometheusPushTxObserver(options, messenger, workerIndex);
}

module.exports.createTxObserver = createTxObserver;
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

'use strict';

const CaliperUtils = require('../../common/utils/caliper-utils');
const ConfigUtil = require('../../common/config/config-util');
const TxObserverInterface = require('./tx-observer-interface');

const express = require('express');
const appServer = express();
const prometheusClient = require('prom-client');
const prometheusGcStats = require('prometheus-gc-stats');

const Logger = CaliperUtils.getLogger('prometheus-tx-observer');

/**
* Prometheus TX observer used to maintain Prometheus metrics by acting as a scrape target.
*/
class PrometheusTxObserver extends TxObserverInterface {
/**
* Initializes the instance.
* @param {object} options The observer configuration object.
* @param {MessengerInterface} messenger The worker messenger instance. Not used.
* @param {number} workerIndex The 0-based index of the worker node.
*/
constructor(options, messenger, workerIndex) {
super(messenger, workerIndex);
this.metricPath = options.metricPath || '/metrics';
this.scrapePort = Number(options.scrapePort) || ConfigUtil.get(ConfigUtil.keys.Monitor.PrometheusScrapePort);
if (CaliperUtils.isForkedProcess()) {
this.scrapePort += workerIndex;
}
this.processMetricCollectInterval = options.processMetricCollectInterval;
this.defaultLabels = options.defaultLabels || {};

Logger.debug(`Configuring Prometheus scrape server for worker ${workerIndex} on port ${this.scrapePort}, with metrics exposed on ${this.metricPath} endpoint`);

// do not use global registry to avoid conflicts with other potential prom-based observers
this.registry = new prometheusClient.Registry();

// automatically apply default internal and user supplied labels
this.defaultLabels.workerIndex = this.workerIndex;
this.defaultLabels.roundIndex = this.currentRound;
this.defaultLabels.roundLabel = this.roundLabel;
this.registry.setDefaultLabels(this.defaultLabels);

// Exposed metrics
this.counterTxSubmitted = new prometheusClient.Counter({
name: 'caliper_tx_submitted',
help: 'The total number of submitted transactions.',
registers: [this.registry]
});

this.counterTxFinished = new prometheusClient.Counter({
name: 'caliper_tx_finished',
help: 'The total number of finished transactions.',
labelNames: ['final_status'],
registers: [this.registry]
});

// configure buckets
let buckets = prometheusClient.linearBuckets(0.1, 0.5, 10); // default
if (options.histogramBuckets) {
if (options.histogramBuckets.explicit) {
buckets = options.histogramBuckets.explicit;
} else if (options.histogramBuckets.linear) {
let linear = options.histogramBuckets.linear;
buckets = prometheusClient.linearBuckets(linear.start, linear.width, linear.count);
} else if (options.histogramBuckets.exponential) {
let exponential = options.histogramBuckets.exponential;
buckets = prometheusClient.exponentialBuckets(exponential.start, exponential.factor, exponential.count);
}
}

this.histogramLatency = new prometheusClient.Histogram({
name: 'caliper_tx_e2e_latency',
help: 'The histogram of end-to-end transaction latencies in seconds.',
labelNames: ['final_status'],
buckets,
registers: [this.registry]
});

// setting an interval enables the default metric collection
if (this.processMetricCollectInterval) {
this.processMetricHandle = prometheusClient.collectDefaultMetrics({
register: this.registry,
timestamps: false,
timeout: this.processMetricCollectInterval
});
const startGcStats = prometheusGcStats(this.registry);
startGcStats();
}

// configure server for Prometheus scrapes:
appServer.get(`${this.metricPath}`, async (req, res) => {
try {
res.set('Content-Type', this.registry.contentType);
res.end(await this.registry.metrics());
} catch (err) {
Logger.error(`Error in metrics provision within worker ${this.workerIndex}`, err.stack);
res.status(500).end(`Error collecting metrics from Hyperledger Caliper worker ${this.workerIndex}`);
}
});
}

/**
* Activates the TX observer instance, and in turn, the new TX statistics collector.
* @param {number} roundIndex The 0-based index of the current round.
* @param {string} roundLabel The roundLabel name.
*/
async activate(roundIndex, roundLabel) {
await super.activate(roundIndex, roundLabel);

// update worker and round metadata
this.defaultLabels.workerIndex = this.workerIndex;
this.defaultLabels.roundIndex = this.currentRound;
this.defaultLabels.roundLabel = this.roundLabel;
this.registry.setDefaultLabels(this.defaultLabels);

// Enable server
this.server = appServer.listen(this.scrapePort);
Logger.debug(`Enabled Prometheus scrape server on ${this.scrapePort}, with metrics exposed on ${this.metricPath} endpoint`);
}

/**
* Deactivates the TX observer interface, and resets all metric collectors
*/
async deactivate() {
await super.deactivate();
this.counterTxSubmitted.reset();
this.counterTxFinished.reset();
this.histogramLatency.reset();
this.registry.resetMetrics();

// Disable server
this.server.close();
}

/**
* Called when TXs are submitted.
* @param {number} count The number of submitted TXs. Can be greater than one for a batch of TXs.
*/
txSubmitted(count) {
this.counterTxSubmitted.inc(count);
}

/**
* Called when TXs are finished.
* @param {TxStatus | TxStatus[]} results The result information of the finished TXs. Can be a collection of results for a batch of TXs.
*/
txFinished(results) {
if (Array.isArray(results)) {
for (const result of results) {
// pass/fail status from result.GetStatus()
this.counterTxFinished.labels(result.GetStatus()).inc();
this.histogramLatency.labels(result.GetStatus()).observe(result.GetTimeFinal() - result.GetTimeCreate());
}
} else {
// pass/fail status from result.GetStatus()
this.counterTxFinished.labels(results.GetStatus()).inc();
this.histogramLatency.labels(results.GetStatus()).observe((results.GetTimeFinal() - results.GetTimeCreate())/1000);
}
}
}

/**
* Factory function for creating a PrometheusTxObserver instance.
* @param {object} options The observer configuration object.
* @param {MessengerInterface} messenger The worker messenger instance.
* @param {number} workerIndex The 0-based index of the worker node.
* @return {TxObserverInterface} The observer instance.
*/
function createTxObserver(options, messenger, workerIndex) {
return new PrometheusTxObserver(options, messenger, workerIndex);
}

module.exports.createTxObserver = createTxObserver;
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const CaliperUtils = require('../../common/utils/caliper-utils');

const builtInTxObservers = new Map([
['logging', path.join(__dirname, 'logging-tx-observer.js')],
// spoilers :) ['prometheus', path.join(__dirname, 'prometheus-tx-observer.js')],
['prometheus', path.join(__dirname, 'prometheus-tx-observer.js')],
['prometheus-push', path.join(__dirname, 'prometheus-push-tx-observer.js')]
]);

Expand All @@ -35,17 +35,18 @@ class TxObserverDispatch extends TxObserverInterface {
* @param {MessengerInterface} messenger The worker messenger instance.
* @param {TxObserverInterface} internalTxObserver The executor's internal TX observer instance.
* @param {string} managerUuid The UUID of the messenger for message sending.
* @param {number} workerIndex The 0-based index of the worker node.
*/
constructor(messenger, internalTxObserver, managerUuid) {
super(messenger);
constructor(messenger, internalTxObserver, managerUuid, workerIndex) {
super(messenger, workerIndex);
// contains the loaded TX observers
this.txObservers = [];

// load the configured TX observers
let observerConfigs = super.getDeclaredTxObservers();
for (let observer of observerConfigs) {
const factoryFunction = CaliperUtils.loadModuleFunction(builtInTxObservers, observer.module, 'createTxObserver');
this.txObservers.push(factoryFunction(observer.options, messenger, managerUuid));
this.txObservers.push(factoryFunction(observer.options, messenger, workerIndex));
}

// always load the internal TX observer
Expand All @@ -54,16 +55,15 @@ class TxObserverDispatch extends TxObserverInterface {

/**
* Activates the dispatch, and in turn, every configured TX observer instance.
* @param {number} workerIndex The 0-based index of the worker node.
* @param {number} roundIndex The 0-based index of the current round.
* @param {string} roundLabel The roundLabel name.
* @async
*/
async activate(workerIndex, roundIndex, roundLabel) {
await super.activate(workerIndex, roundIndex, roundLabel);
async activate(roundIndex, roundLabel) {
await super.activate(roundIndex, roundLabel);

for (let observer of this.txObservers) {
await observer.activate(workerIndex, roundIndex, roundLabel);
await observer.activate(roundIndex, roundLabel);
}
}

Expand Down
Loading

0 comments on commit 80a778c

Please sign in to comment.