Skip to content

Commit

Permalink
Reduce the number of buckets too
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisronline committed Sep 25, 2019
1 parent d633131 commit ce049c5
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { checkParam } from '../error_missing_required';
import { getSeries } from './get_series';
import { calculateTimeseriesInterval } from '../calculate_timeseries_interval';

export function getMetrics(req, indexPattern, metricSet = [], filters = []) {
export function getMetrics(req, indexPattern, metricSet = [], filters = [], metricOptions = {}) {
checkParam(indexPattern, 'indexPattern in details/getMetrics');
checkParam(metricSet, 'metricSet in details/getMetrics');

Expand All @@ -33,7 +33,7 @@ export function getMetrics(req, indexPattern, metricSet = [], filters = []) {
}

return Promise.map(metricNames, metricName => {
return getSeries(req, indexPattern, metricName, filters, { min, max, bucketSize });
return getSeries(req, indexPattern, metricName, metricOptions, filters, { min, max, bucketSize });
});
})
.then(rows => {
Expand Down
32 changes: 20 additions & 12 deletions x-pack/legacy/plugins/monitoring/server/lib/details/get_series.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,27 @@ function createMetricAggs(metric) {
return metric.aggs;
}

function fetchSeries(req, indexPattern, metric, min, max, bucketSize, filters) {
function fetchSeries(req, indexPattern, metric, metricOptions, min, max, bucketSize, filters) {
// if we're using a derivative metric, offset the min (also @see comment on offsetMinForDerivativeMetric function)
const adjustedMin = metric.derivative ? offsetMinForDerivativeMetric(min, bucketSize) : min;

const dateHistogramSubAggs = metric.dateHistogramSubAggs || {
metric: {
[metric.metricAgg]: {
field: metric.field
}
},
...createMetricAggs(metric)
};
let dateHistogramSubAggs = null;
if (metric.getDateHistogramSubAggs) {
dateHistogramSubAggs = metric.getDateHistogramSubAggs(metricOptions);
}
else if (metric.dateHistogramSubAggs) {
dateHistogramSubAggs = metric.dateHistogramSubAggs;
}
else {
dateHistogramSubAggs = {
metric: {
[metric.metricAgg]: {
field: metric.field
}
},
...createMetricAggs(metric)
};
}

const params = {
index: indexPattern,
Expand Down Expand Up @@ -184,7 +193,6 @@ function handleSeries(metric, min, max, bucketSizeInSeconds, response) {
const lastUsableBucketIndex = findLastUsableBucketIndex(buckets, max, firstUsableBucketIndex, bucketSizeInSeconds * 1000);
let data = [];


if (firstUsableBucketIndex <= lastUsableBucketIndex) {
// map buckets to values for charts
const key = derivative ? 'metric_deriv.normalized_value' : 'metric.value';
Expand Down Expand Up @@ -217,14 +225,14 @@ function handleSeries(metric, min, max, bucketSizeInSeconds, response) {
* @param {Array} filters Any filters that should be applied to the query.
* @return {Promise} The object response containing the {@code timeRange}, {@code metric}, and {@code data}.
*/
export async function getSeries(req, indexPattern, metricName, filters, { min, max, bucketSize }) {
export async function getSeries(req, indexPattern, metricName, metricOptions, filters, { min, max, bucketSize }) {
checkParam(indexPattern, 'indexPattern in details/getSeries');

const metric = metrics[metricName];
if (!metric) {
throw new Error(`Not a valid metric: ${metricName}`);
}
const response = await fetchSeries(req, indexPattern, metric, min, max, bucketSize, filters);
const response = await fetchSeries(req, indexPattern, metric, metricOptions, min, max, bucketSize, filters);

return handleSeries(metric, min, max, bucketSize, response);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
* you may not use this file except in compliance with the Elastic License.
*/
import moment from 'moment';
import { get } from 'lodash';
import { createQuery } from '../create_query';
import { LogstashMetric } from '../metrics';

export async function getLogstashPipelineIds(req, logstashIndexPattern, size) {
export async function getLogstashPipelineIds(req, logstashIndexPattern) {
const start = moment.utc(req.payload.timeRange.min).valueOf();
const end = moment.utc(req.payload.timeRange.max).valueOf();

Expand All @@ -16,7 +17,7 @@ export async function getLogstashPipelineIds(req, logstashIndexPattern, size) {
size: 0,
ignoreUnavailable: true,
filterPath: [
'aggregations.nested_context'
'aggregations.nested_context.composite_data.buckets'
],
body: {
query: createQuery({
Expand All @@ -31,10 +32,31 @@ export async function getLogstashPipelineIds(req, logstashIndexPattern, size) {
path: 'logstash_stats.pipelines'
},
aggs: {
pipelines: {
terms: {
field: 'logstash_stats.pipelines.id',
size,
composite_data: {
composite: {
sources: [
{
id: {
terms: {
field: 'logstash_stats.pipelines.id',
}
}
},
{
hash: {
terms: {
field: 'logstash_stats.pipelines.hash',
}
}
},
{
ephemeral_id: {
terms: {
field: 'logstash_stats.pipelines.ephemeral_id',
}
}
}
]
}
}
}
Expand All @@ -44,5 +66,6 @@ export async function getLogstashPipelineIds(req, logstashIndexPattern, size) {
};

const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('monitoring');
return callWithRequest(req, 'search', params);
const response = await callWithRequest(req, 'search', params);
return get(response, 'aggregations.nested_context.composite_data.buckets', []).map(bucket => bucket.key);
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ export async function processPipelinesAPIResponse(response, throughputMetricKey,
}


export async function getPipelines(req, logstashIndexPattern, pipelineIds, metricSet) {
export async function getPipelines(req, logstashIndexPattern, pipelineIds, metricSet, metricOptions = {}) {
checkParam(logstashIndexPattern, 'logstashIndexPattern in logstash/getPipelines');
checkParam(metricSet, 'metricSet in logstash/getPipelines');

Expand All @@ -92,6 +92,6 @@ export async function getPipelines(req, logstashIndexPattern, pipelineIds, metri
});
}

const metricsResponse = await getMetrics(req, logstashIndexPattern, metricSet, filters);
const metricsResponse = await getMetrics(req, logstashIndexPattern, metricSet, filters, metricOptions);
return _handleResponse(metricsResponse, pipelineIds);
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ export class LogstashPipelineThroughputMetric extends LogstashMetric {
derivative: false
});

this.dateHistogramSubAggs = {
this.getDateHistogramSubAggs = ({ pageOfPipelines }) => ({
pipelines_nested: {
nested: {
path: 'logstash_stats.pipelines'
Expand All @@ -279,7 +279,7 @@ export class LogstashPipelineThroughputMetric extends LogstashMetric {
by_pipeline_id: {
terms: {
field: 'logstash_stats.pipelines.id',
size: 1000
include: pageOfPipelines.map(pipeline => pipeline.id),
},
aggs: {
throughput: {
Expand All @@ -290,7 +290,7 @@ export class LogstashPipelineThroughputMetric extends LogstashMetric {
by_pipeline_hash: {
terms: {
field: 'logstash_stats.pipelines.hash',
size: 1000
include: pageOfPipelines.map(pipeline => pipeline.hash),
},
aggs: {
throughput: {
Expand All @@ -301,7 +301,7 @@ export class LogstashPipelineThroughputMetric extends LogstashMetric {
by_ephemeral_id: {
terms: {
field: 'logstash_stats.pipelines.ephemeral_id',
size: 1000
include: pageOfPipelines.map(pipeline => pipeline.ephemeral_id),
},
aggs: {
events_stats: {
Expand All @@ -326,7 +326,7 @@ export class LogstashPipelineThroughputMetric extends LogstashMetric {
}
}
}
};
});

this.calculation = (bucket, _key, _metric, bucketSizeInSeconds) => {
const pipelineThroughputs = {};
Expand All @@ -353,32 +353,38 @@ export class LogstashPipelineNodeCountMetric extends LogstashMetric {
derivative: false
});

this.dateHistogramSubAggs = {
pipelines_nested: {
nested: {
path: 'logstash_stats.pipelines'
},
aggs: {
by_pipeline_id: {
terms: {
field: 'logstash_stats.pipelines.id',
size: 1000
},
aggs: {
to_root: {
reverse_nested: {},
aggs: {
node_count: {
cardinality: {
field: this.field
this.getDateHistogramSubAggs = ({ pageOfPipelines }) => {
const termAggExtras = {};
if (pageOfPipelines) {
termAggExtras.include = pageOfPipelines.map(pipeline => pipeline.id);
}
return {
pipelines_nested: {
nested: {
path: 'logstash_stats.pipelines'
},
aggs: {
by_pipeline_id: {
terms: {
field: 'logstash_stats.pipelines.id',
...termAggExtras
},
aggs: {
to_root: {
reverse_nested: {},
aggs: {
node_count: {
cardinality: {
field: this.field
}
}
}
}
}
}
}
}
}
};
};

this.calculation = bucket => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*/

import Joi from 'joi';
import { get, sortByOrder } from 'lodash';
import { sortByOrder } from 'lodash';
import { getClusterStatus } from '../../../../../lib/logstash/get_cluster_status';
import { getPipelines, processPipelinesAPIResponse } from '../../../../../lib/logstash/get_pipelines';
import { handleError } from '../../../../../lib/errors';
Expand Down Expand Up @@ -52,16 +52,15 @@ export function logstashClusterPipelinesRoute(server) {
const clusterUuid = req.params.clusterUuid;
const lsIndexPattern = prefixIndexPattern(config, INDEX_PATTERN_LOGSTASH, ccs);

const rawPipelines = await getLogstashPipelineIds(req, lsIndexPattern, config.get('xpack.monitoring.max_bucket_size'));
const pipelines = get(rawPipelines, 'aggregations.nested_context.pipelines.buckets', []).map(bucket => ({ id: bucket.key }));
const pipelines = await getLogstashPipelineIds(req, lsIndexPattern);

// Manually apply pagination/sorting/filtering concerns

// Filtering
const filteredPipelines = filter(pipelines, queryText, ['id']);

// Sorting
const sortedPipelines = sortByOrder(filteredPipelines, pipeline => pipeline[sort.field], sort.direction);
const sortedPipelines = sort ? sortByOrder(filteredPipelines, pipeline => pipeline[sort.field], sort.direction) : filteredPipelines;

// Pagination
const pageOfPipelines = paginate(pagination, sortedPipelines);
Expand All @@ -77,14 +76,18 @@ export function logstashClusterPipelinesRoute(server) {
nodesCountMetric
];

const metricOptions = {
pageOfPipelines,
};

try {
const pipelineData = await getPipelines(req, lsIndexPattern, pipelineIds, metricSet);
const pipelineData = await getPipelines(req, lsIndexPattern, pipelineIds, metricSet, metricOptions);
// We need to re-sort because the data from above comes back in random order
const pipelinesResponse = sortByOrder(
const pipelinesResponse = sort ? sortByOrder(
pipelineData,
pipeline => pipeline[sort.field],
sort.direction
);
) : pipelineData;
const response = await processPipelinesAPIResponse(
{
pipelines: pipelinesResponse,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*/

import Joi from 'joi';
import { get, sortByOrder } from 'lodash';
import { sortByOrder } from 'lodash';
import { getNodeInfo } from '../../../../../lib/logstash/get_node_info';
import { getPipelines, processPipelinesAPIResponse } from '../../../../../lib/logstash/get_pipelines';
import { handleError } from '../../../../../lib/errors';
Expand Down Expand Up @@ -52,8 +52,7 @@ export function logstashNodePipelinesRoute(server) {
const { clusterUuid, logstashUuid } = req.params;
const lsIndexPattern = prefixIndexPattern(config, INDEX_PATTERN_LOGSTASH, ccs);

const rawPipelines = await getLogstashPipelineIds(req, lsIndexPattern, config.get('xpack.monitoring.max_bucket_size'));
const pipelines = get(rawPipelines, 'aggregations.nested_context.pipelines.buckets', []).map(bucket => ({ id: bucket.key }));
const pipelines = await getLogstashPipelineIds(req, lsIndexPattern);

// Manually apply pagination/sorting/filtering concerns

Expand All @@ -76,8 +75,12 @@ export function logstashNodePipelinesRoute(server) {
nodesCountMetric
];

const metricOptions = {
pageOfPipelines,
};

try {
const pipelineData = await getPipelines(req, lsIndexPattern, pipelineIds, metricSet);
const pipelineData = await getPipelines(req, lsIndexPattern, pipelineIds, metricSet, metricOptions);
// We need to re-sort because the data from above comes back in random order
const pipelinesResponse = sortByOrder(
pipelineData,
Expand Down

0 comments on commit ce049c5

Please sign in to comment.