Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Monitoring] Fix inaccuracies in logstash pipeline listing metrics #55868

Merged
merged 5 commits into from
Jan 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import { checkCcrEnabled } from '../elasticsearch/ccr';
import { getStandaloneClusterDefinition, hasStandaloneClusters } from '../standalone_clusters';
import { getLogTypes } from '../logs';
import { isInCodePath } from './is_in_code_path';
import { getLogstashPipelineIds } from '../logstash/get_pipeline_ids';

/**
* Get all clusters or the cluster associated with {@code clusterUuid} when it is defined.
Expand All @@ -53,6 +54,8 @@ export async function getClustersFromRequest(
filebeatIndexPattern,
} = indexPatterns;

const config = req.server.config();
const size = config.get('xpack.monitoring.max_bucket_size');
const isStandaloneCluster = clusterUuid === STANDALONE_CLUSTER_CLUSTER_UUID;

let clusters = [];
Expand Down Expand Up @@ -158,25 +161,27 @@ export async function getClustersFromRequest(
});

// add logstash data
const logstashes = isInCodePath(codePaths, [CODE_PATH_LOGSTASH])
? await getLogstashForClusters(req, lsIndexPattern, clusters)
: [];

const clusterPipelineNodesCount = isInCodePath(codePaths, [CODE_PATH_LOGSTASH])
? await getPipelines(req, lsIndexPattern, null, ['logstash_cluster_pipeline_nodes_count'])
: [];

// add the logstash data to each cluster
logstashes.forEach(logstash => {
const clusterIndex = findIndex(clusters, { cluster_uuid: logstash.clusterUuid });

// withhold LS overview stats until pipeline metrics have at least one full bucket
if (logstash.clusterUuid === req.params.clusterUuid && clusterPipelineNodesCount.length === 0) {
logstash.stats = {};
}

set(clusters[clusterIndex], 'logstash', logstash.stats);
});
if (isInCodePath(codePaths, [CODE_PATH_LOGSTASH])) {
const logstashes = await getLogstashForClusters(req, lsIndexPattern, clusters);
const pipelines = await getLogstashPipelineIds(req, lsIndexPattern, { clusterUuid }, size);
const clusterPipelineNodesCount = await getPipelines(req, lsIndexPattern, pipelines, [
'logstash_cluster_pipeline_nodes_count',
]);
// add the logstash data to each cluster
logstashes.forEach(logstash => {
const clusterIndex = findIndex(clusters, { cluster_uuid: logstash.clusterUuid });

// withhold LS overview stats until pipeline metrics have at least one full bucket
if (
logstash.clusterUuid === req.params.clusterUuid &&
clusterPipelineNodesCount.length === 0
) {
logstash.stats = {};
}

set(clusters[clusterIndex], 'logstash', logstash.stats);
});
}

// add beats data
const beatsByCluster = isInCodePath(codePaths, [CODE_PATH_BEATS])
Expand All @@ -199,7 +204,6 @@ export async function getClustersFromRequest(
// check ccr configuration
const isCcrEnabled = await checkCcrEnabled(req, esIndexPattern);

const config = req.server.config();
const kibanaUuid = config.get('server.uuid');

return getClustersSummary(req.server, clusters, kibanaUuid, isCcrEnabled);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
*/

import expect from '@kbn/expect';
import { handleGetPipelinesResponse, processPipelinesAPIResponse } from '../get_pipelines';
import { processPipelinesAPIResponse } from '../get_pipelines';

describe('processPipelinesAPIResponse', () => {
let response;
beforeEach(() => {
response = {
pipelines: [
{
id: 1,
metrics: {
throughput_for_cluster: {
data: [
Expand All @@ -22,8 +23,8 @@ describe('processPipelinesAPIResponse', () => {
},
nodes_count_for_cluster: {
data: [
[1513721903, 3],
[1513722162, 2],
[1513721903, { 1: 5 }],
[1513722162, { 1: 10 }],
],
},
},
Expand All @@ -32,96 +33,27 @@ describe('processPipelinesAPIResponse', () => {
};
});

it('normalizes the metric keys', () => {
processPipelinesAPIResponse(response, 'throughput_for_cluster', 'nodes_count_for_cluster').then(
processedResponse => {
expect(processedResponse.pipelines[0].metrics.throughput).to.eql(
response.pipelines[0].metrics.throughput_for_cluster
);
expect(processedResponse.pipelines[0].metrics.nodesCount).to.eql(
response.pipelines[0].metrics.nodes_count_for_cluster
);
}
it('normalizes the metric keys', async () => {
const processedResponse = await processPipelinesAPIResponse(
response,
'throughput_for_cluster',
'nodes_count_for_cluster'
);
expect(processedResponse.pipelines[0].metrics.throughput).to.eql(
response.pipelines[0].metrics.throughput_for_cluster
);
expect(processedResponse.pipelines[0].metrics.nodesCount.data[0][0]).to.eql(1513721903);
expect(processedResponse.pipelines[0].metrics.nodesCount.data[0][1]).to.eql(5);
expect(processedResponse.pipelines[0].metrics.nodesCount.data[1][0]).to.eql(1513722162);
expect(processedResponse.pipelines[0].metrics.nodesCount.data[1][1]).to.eql(10);
});

it('computes the latest metrics', () => {
processPipelinesAPIResponse(response, 'throughput_for_cluster', 'nodes_count_for_cluster').then(
processedResponse => {
expect(processedResponse.pipelines[0].latestThroughput).to.eql(23);
expect(processedResponse.pipelines[0].latestNodesCount).to.eql(2);
expect(processedResponse.pipelines[0].latestNodesCount).to.eql(10);
}
);
});
});

describe('get_pipelines', () => {
let fetchPipelinesWithMetricsResult;

describe('fetchPipelinesWithMetrics result contains no pipelines', () => {
beforeEach(() => {
fetchPipelinesWithMetricsResult = {
logstash_pipeline_throughput: [
{
data: [],
},
],
logstash_pipeline_nodes_count: [
{
data: [],
},
],
};
});

it('returns an empty array', () => {
const result = handleGetPipelinesResponse(fetchPipelinesWithMetricsResult);
expect(result).to.eql([]);
});
});

describe('fetchPipelinesWithMetrics result contains pipelines', () => {
beforeEach(() => {
fetchPipelinesWithMetricsResult = {
logstash_pipeline_throughput: [
{
data: [[1513123151000, { apache_logs: 231, logstash_tweets: 34 }]],
},
],
logstash_pipeline_nodes_count: [
{
data: [[1513123151000, { apache_logs: 3, logstash_tweets: 1 }]],
},
],
};
});

it('returns the correct structure for a non-empty response', () => {
const result = handleGetPipelinesResponse(fetchPipelinesWithMetricsResult);
expect(result).to.eql([
{
id: 'apache_logs',
metrics: {
logstash_pipeline_throughput: {
data: [[1513123151000, 231]],
},
logstash_pipeline_nodes_count: {
data: [[1513123151000, 3]],
},
},
},
{
id: 'logstash_tweets',
metrics: {
logstash_pipeline_throughput: {
data: [[1513123151000, 34]],
},
logstash_pipeline_nodes_count: {
data: [[1513123151000, 1]],
},
},
},
]);
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import { get } from 'lodash';
import { filter } from '../pagination/filter';
import { getLogstashPipelineIds } from './get_pipeline_ids';
import { handleGetPipelinesResponse } from './get_pipelines';
import { sortPipelines } from './sort_pipelines';
import { paginate } from '../pagination/paginate';
import { getMetrics } from '../details/get_metrics';
Expand Down Expand Up @@ -51,19 +50,33 @@ export async function getPaginatedPipelines(
// the necessary sort - we only need the last bucket of data so we
// fetch the last two buckets of data (to ensure we have a single full bucekt),
// then return the value from that last bucket
const metricSeriesData = await getMetrics(
req,
lsIndexPattern,
metricSet,
[],
{ pageOfPipelines: pipelines },
2
);
const pipelineAggregationsData = handleGetPipelinesResponse(
metricSeriesData,
pipelines.map(p => p.id)
const metricSeriesData = Object.values(
await Promise.all(
pipelines.map(pipeline => {
return new Promise(async resolve => {
const data = await getMetrics(
req,
lsIndexPattern,
metricSet,
[],
{
pipeline,
},
2
);

resolve({
id: pipeline.id,
metrics: Object.keys(data).reduce((accum, metricName) => {
accum[metricName] = data[metricName][0];
return accum;
}, {}),
});
});
})
)
);
for (const pipelineAggregationData of pipelineAggregationsData) {
for (const pipelineAggregationData of metricSeriesData) {
for (const pipeline of pipelines) {
if (pipelineAggregationData.id === pipeline.id) {
for (const metric of metricSet) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/
import moment from 'moment';
import { get, uniq } from 'lodash';
import { get } from 'lodash';
import { createQuery } from '../create_query';
import { LogstashMetric } from '../metrics';

Expand All @@ -26,7 +26,7 @@ export async function getLogstashPipelineIds(
index: logstashIndexPattern,
size: 0,
ignoreUnavailable: true,
filterPath: ['aggregations.nested_context.composite_data.buckets'],
filterPath: ['aggregations.nest.id.buckets'],
body: {
query: createQuery({
start,
Expand All @@ -36,37 +36,28 @@ export async function getLogstashPipelineIds(
filters,
}),
aggs: {
nested_context: {
nest: {
nested: {
path: 'logstash_stats.pipelines',
},
aggs: {
composite_data: {
composite: {
id: {
terms: {
field: 'logstash_stats.pipelines.id',
size,
sources: [
{
id: {
terms: {
field: 'logstash_stats.pipelines.id',
},
},
},
{
hash: {
terms: {
field: 'logstash_stats.pipelines.hash',
},
},
},
{
ephemeral_id: {
},
aggs: {
unnest: {
reverse_nested: {},
aggs: {
nodes: {
terms: {
field: 'logstash_stats.pipelines.ephemeral_id',
field: 'logstash_stats.logstash.uuid',
size,
},
},
},
],
},
},
},
},
Expand All @@ -77,8 +68,8 @@ export async function getLogstashPipelineIds(

const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('monitoring');
const response = await callWithRequest(req, 'search', params);
const data = get(response, 'aggregations.nested_context.composite_data.buckets', []).map(
bucket => bucket.key
);
return uniq(data, item => item.id);
return get(response, 'aggregations.nest.id.buckets', []).map(bucket => ({
id: bucket.key,
nodeIds: get(bucket, 'unnest.nodes.buckets', []).map(item => item.key),
}));
}
Loading