From ce049c546a6d243f020443aa079354d578e6e877 Mon Sep 17 00:00:00 2001 From: chrisronline Date: Wed, 25 Sep 2019 16:42:25 -0400 Subject: [PATCH] Reduce the number of buckets too --- .../server/lib/details/get_metrics.js | 4 +- .../server/lib/details/get_series.js | 32 ++++++----- .../server/lib/logstash/get_pipeline_ids.js | 37 ++++++++++--- .../server/lib/logstash/get_pipelines.js | 4 +- .../server/lib/metrics/logstash/classes.js | 54 ++++++++++--------- .../logstash/pipelines/cluster_pipelines.js | 17 +++--- .../v1/logstash/pipelines/node_pipelines.js | 11 ++-- 7 files changed, 101 insertions(+), 58 deletions(-) diff --git a/x-pack/legacy/plugins/monitoring/server/lib/details/get_metrics.js b/x-pack/legacy/plugins/monitoring/server/lib/details/get_metrics.js index 57c936f960212b..2e35088aa01ce0 100644 --- a/x-pack/legacy/plugins/monitoring/server/lib/details/get_metrics.js +++ b/x-pack/legacy/plugins/monitoring/server/lib/details/get_metrics.js @@ -11,7 +11,7 @@ import { checkParam } from '../error_missing_required'; import { getSeries } from './get_series'; import { calculateTimeseriesInterval } from '../calculate_timeseries_interval'; -export function getMetrics(req, indexPattern, metricSet = [], filters = []) { +export function getMetrics(req, indexPattern, metricSet = [], filters = [], metricOptions = {}) { checkParam(indexPattern, 'indexPattern in details/getMetrics'); checkParam(metricSet, 'metricSet in details/getMetrics'); @@ -33,7 +33,7 @@ export function getMetrics(req, indexPattern, metricSet = [], filters = []) { } return Promise.map(metricNames, metricName => { - return getSeries(req, indexPattern, metricName, filters, { min, max, bucketSize }); + return getSeries(req, indexPattern, metricName, metricOptions, filters, { min, max, bucketSize }); }); }) .then(rows => { diff --git a/x-pack/legacy/plugins/monitoring/server/lib/details/get_series.js b/x-pack/legacy/plugins/monitoring/server/lib/details/get_series.js index 306e93273c157d..03613b39a71e92 100644 --- a/x-pack/legacy/plugins/monitoring/server/lib/details/get_series.js +++ b/x-pack/legacy/plugins/monitoring/server/lib/details/get_series.js @@ -70,18 +70,27 @@ function createMetricAggs(metric) { return metric.aggs; } -function fetchSeries(req, indexPattern, metric, min, max, bucketSize, filters) { +function fetchSeries(req, indexPattern, metric, metricOptions, min, max, bucketSize, filters) { // if we're using a derivative metric, offset the min (also @see comment on offsetMinForDerivativeMetric function) const adjustedMin = metric.derivative ? offsetMinForDerivativeMetric(min, bucketSize) : min; - const dateHistogramSubAggs = metric.dateHistogramSubAggs || { - metric: { - [metric.metricAgg]: { - field: metric.field - } - }, - ...createMetricAggs(metric) - }; + let dateHistogramSubAggs = null; + if (metric.getDateHistogramSubAggs) { + dateHistogramSubAggs = metric.getDateHistogramSubAggs(metricOptions); + } + else if (metric.dateHistogramSubAggs) { + dateHistogramSubAggs = metric.dateHistogramSubAggs; + } + else { + dateHistogramSubAggs = { + metric: { + [metric.metricAgg]: { + field: metric.field + } + }, + ...createMetricAggs(metric) + }; + } const params = { index: indexPattern, @@ -184,7 +193,6 @@ function handleSeries(metric, min, max, bucketSizeInSeconds, response) { const lastUsableBucketIndex = findLastUsableBucketIndex(buckets, max, firstUsableBucketIndex, bucketSizeInSeconds * 1000); let data = []; - if (firstUsableBucketIndex <= lastUsableBucketIndex) { // map buckets to values for charts const key = derivative ? 'metric_deriv.normalized_value' : 'metric.value'; @@ -217,14 +225,14 @@ function handleSeries(metric, min, max, bucketSizeInSeconds, response) { * @param {Array} filters Any filters that should be applied to the query. * @return {Promise} The object response containing the {@code timeRange}, {@code metric}, and {@code data}. */ -export async function getSeries(req, indexPattern, metricName, filters, { min, max, bucketSize }) { +export async function getSeries(req, indexPattern, metricName, metricOptions, filters, { min, max, bucketSize }) { checkParam(indexPattern, 'indexPattern in details/getSeries'); const metric = metrics[metricName]; if (!metric) { throw new Error(`Not a valid metric: ${metricName}`); } - const response = await fetchSeries(req, indexPattern, metric, min, max, bucketSize, filters); + const response = await fetchSeries(req, indexPattern, metric, metricOptions, min, max, bucketSize, filters); return handleSeries(metric, min, max, bucketSize, response); } diff --git a/x-pack/legacy/plugins/monitoring/server/lib/logstash/get_pipeline_ids.js b/x-pack/legacy/plugins/monitoring/server/lib/logstash/get_pipeline_ids.js index f4a97a78e97c2a..9a8d21ae9e151a 100644 --- a/x-pack/legacy/plugins/monitoring/server/lib/logstash/get_pipeline_ids.js +++ b/x-pack/legacy/plugins/monitoring/server/lib/logstash/get_pipeline_ids.js @@ -4,10 +4,11 @@ * you may not use this file except in compliance with the Elastic License. */ import moment from 'moment'; +import { get } from 'lodash'; import { createQuery } from '../create_query'; import { LogstashMetric } from '../metrics'; -export async function getLogstashPipelineIds(req, logstashIndexPattern, size) { +export async function getLogstashPipelineIds(req, logstashIndexPattern) { const start = moment.utc(req.payload.timeRange.min).valueOf(); const end = moment.utc(req.payload.timeRange.max).valueOf(); @@ -16,7 +17,7 @@ export async function getLogstashPipelineIds(req, logstashIndexPattern, size) { size: 0, ignoreUnavailable: true, filterPath: [ - 'aggregations.nested_context' + 'aggregations.nested_context.composite_data.buckets' ], body: { query: createQuery({ @@ -31,10 +32,31 @@ export async function getLogstashPipelineIds(req, logstashIndexPattern, size) { path: 'logstash_stats.pipelines' }, aggs: { - pipelines: { - terms: { - field: 'logstash_stats.pipelines.id', - size, + composite_data: { + composite: { + sources: [ + { + id: { + terms: { + field: 'logstash_stats.pipelines.id', + } + } + }, + { + hash: { + terms: { + field: 'logstash_stats.pipelines.hash', + } + } + }, + { + ephemeral_id: { + terms: { + field: 'logstash_stats.pipelines.ephemeral_id', + } + } + } + ] } } } @@ -44,5 +66,6 @@ export async function getLogstashPipelineIds(req, logstashIndexPattern, size) { }; const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('monitoring'); - return callWithRequest(req, 'search', params); + const response = await callWithRequest(req, 'search', params); + return get(response, 'aggregations.nested_context.composite_data.buckets', []).map(bucket => bucket.key); } diff --git a/x-pack/legacy/plugins/monitoring/server/lib/logstash/get_pipelines.js b/x-pack/legacy/plugins/monitoring/server/lib/logstash/get_pipelines.js index 3b7c3fdb039d8a..7aad52a749b239 100644 --- a/x-pack/legacy/plugins/monitoring/server/lib/logstash/get_pipelines.js +++ b/x-pack/legacy/plugins/monitoring/server/lib/logstash/get_pipelines.js @@ -74,7 +74,7 @@ export async function processPipelinesAPIResponse(response, throughputMetricKey, } -export async function getPipelines(req, logstashIndexPattern, pipelineIds, metricSet) { +export async function getPipelines(req, logstashIndexPattern, pipelineIds, metricSet, metricOptions = {}) { checkParam(logstashIndexPattern, 'logstashIndexPattern in logstash/getPipelines'); checkParam(metricSet, 'metricSet in logstash/getPipelines'); @@ -92,6 +92,6 @@ export async function getPipelines(req, logstashIndexPattern, pipelineIds, metri }); } - const metricsResponse = await getMetrics(req, logstashIndexPattern, metricSet, filters); + const metricsResponse = await getMetrics(req, logstashIndexPattern, metricSet, filters, metricOptions); return _handleResponse(metricsResponse, pipelineIds); } diff --git a/x-pack/legacy/plugins/monitoring/server/lib/metrics/logstash/classes.js b/x-pack/legacy/plugins/monitoring/server/lib/metrics/logstash/classes.js index dade736cd53f85..22e2a16257cb41 100644 --- a/x-pack/legacy/plugins/monitoring/server/lib/metrics/logstash/classes.js +++ b/x-pack/legacy/plugins/monitoring/server/lib/metrics/logstash/classes.js @@ -270,7 +270,7 @@ export class LogstashPipelineThroughputMetric extends LogstashMetric { derivative: false }); - this.dateHistogramSubAggs = { + this.getDateHistogramSubAggs = ({ pageOfPipelines }) => ({ pipelines_nested: { nested: { path: 'logstash_stats.pipelines' @@ -279,7 +279,7 @@ export class LogstashPipelineThroughputMetric extends LogstashMetric { by_pipeline_id: { terms: { field: 'logstash_stats.pipelines.id', - size: 1000 + include: pageOfPipelines.map(pipeline => pipeline.id), }, aggs: { throughput: { @@ -290,7 +290,7 @@ export class LogstashPipelineThroughputMetric extends LogstashMetric { by_pipeline_hash: { terms: { field: 'logstash_stats.pipelines.hash', - size: 1000 + include: pageOfPipelines.map(pipeline => pipeline.hash), }, aggs: { throughput: { @@ -301,7 +301,7 @@ export class LogstashPipelineThroughputMetric extends LogstashMetric { by_ephemeral_id: { terms: { field: 'logstash_stats.pipelines.ephemeral_id', - size: 1000 + include: pageOfPipelines.map(pipeline => pipeline.ephemeral_id), }, aggs: { events_stats: { @@ -326,7 +326,7 @@ export class LogstashPipelineThroughputMetric extends LogstashMetric { } } } - }; + }); this.calculation = (bucket, _key, _metric, bucketSizeInSeconds) => { const pipelineThroughputs = {}; @@ -353,24 +353,30 @@ export class LogstashPipelineNodeCountMetric extends LogstashMetric { derivative: false }); - this.dateHistogramSubAggs = { - pipelines_nested: { - nested: { - path: 'logstash_stats.pipelines' - }, - aggs: { - by_pipeline_id: { - terms: { - field: 'logstash_stats.pipelines.id', - size: 1000 - }, - aggs: { - to_root: { - reverse_nested: {}, - aggs: { - node_count: { - cardinality: { - field: this.field + this.getDateHistogramSubAggs = ({ pageOfPipelines }) => { + const termAggExtras = {}; + if (pageOfPipelines) { + termAggExtras.include = pageOfPipelines.map(pipeline => pipeline.id); + } + return { + pipelines_nested: { + nested: { + path: 'logstash_stats.pipelines' + }, + aggs: { + by_pipeline_id: { + terms: { + field: 'logstash_stats.pipelines.id', + ...termAggExtras + }, + aggs: { + to_root: { + reverse_nested: {}, + aggs: { + node_count: { + cardinality: { + field: this.field + } } } } @@ -378,7 +384,7 @@ export class LogstashPipelineNodeCountMetric extends LogstashMetric { } } } - } + }; }; this.calculation = bucket => { diff --git a/x-pack/legacy/plugins/monitoring/server/routes/api/v1/logstash/pipelines/cluster_pipelines.js b/x-pack/legacy/plugins/monitoring/server/routes/api/v1/logstash/pipelines/cluster_pipelines.js index 02f8aaa6718a27..efb0c881c75b62 100644 --- a/x-pack/legacy/plugins/monitoring/server/routes/api/v1/logstash/pipelines/cluster_pipelines.js +++ b/x-pack/legacy/plugins/monitoring/server/routes/api/v1/logstash/pipelines/cluster_pipelines.js @@ -5,7 +5,7 @@ */ import Joi from 'joi'; -import { get, sortByOrder } from 'lodash'; +import { sortByOrder } from 'lodash'; import { getClusterStatus } from '../../../../../lib/logstash/get_cluster_status'; import { getPipelines, processPipelinesAPIResponse } from '../../../../../lib/logstash/get_pipelines'; import { handleError } from '../../../../../lib/errors'; @@ -52,8 +52,7 @@ export function logstashClusterPipelinesRoute(server) { const clusterUuid = req.params.clusterUuid; const lsIndexPattern = prefixIndexPattern(config, INDEX_PATTERN_LOGSTASH, ccs); - const rawPipelines = await getLogstashPipelineIds(req, lsIndexPattern, config.get('xpack.monitoring.max_bucket_size')); - const pipelines = get(rawPipelines, 'aggregations.nested_context.pipelines.buckets', []).map(bucket => ({ id: bucket.key })); + const pipelines = await getLogstashPipelineIds(req, lsIndexPattern); // Manually apply pagination/sorting/filtering concerns @@ -61,7 +60,7 @@ export function logstashClusterPipelinesRoute(server) { const filteredPipelines = filter(pipelines, queryText, ['id']); // Sorting - const sortedPipelines = sortByOrder(filteredPipelines, pipeline => pipeline[sort.field], sort.direction); + const sortedPipelines = sort ? sortByOrder(filteredPipelines, pipeline => pipeline[sort.field], sort.direction) : filteredPipelines; // Pagination const pageOfPipelines = paginate(pagination, sortedPipelines); @@ -77,14 +76,18 @@ export function logstashClusterPipelinesRoute(server) { nodesCountMetric ]; + const metricOptions = { + pageOfPipelines, + }; + try { - const pipelineData = await getPipelines(req, lsIndexPattern, pipelineIds, metricSet); + const pipelineData = await getPipelines(req, lsIndexPattern, pipelineIds, metricSet, metricOptions); // We need to re-sort because the data from above comes back in random order - const pipelinesResponse = sortByOrder( + const pipelinesResponse = sort ? sortByOrder( pipelineData, pipeline => pipeline[sort.field], sort.direction - ); + ) : pipelineData; const response = await processPipelinesAPIResponse( { pipelines: pipelinesResponse, diff --git a/x-pack/legacy/plugins/monitoring/server/routes/api/v1/logstash/pipelines/node_pipelines.js b/x-pack/legacy/plugins/monitoring/server/routes/api/v1/logstash/pipelines/node_pipelines.js index f36eebd40a4111..8aa551ac875031 100644 --- a/x-pack/legacy/plugins/monitoring/server/routes/api/v1/logstash/pipelines/node_pipelines.js +++ b/x-pack/legacy/plugins/monitoring/server/routes/api/v1/logstash/pipelines/node_pipelines.js @@ -5,7 +5,7 @@ */ import Joi from 'joi'; -import { get, sortByOrder } from 'lodash'; +import { sortByOrder } from 'lodash'; import { getNodeInfo } from '../../../../../lib/logstash/get_node_info'; import { getPipelines, processPipelinesAPIResponse } from '../../../../../lib/logstash/get_pipelines'; import { handleError } from '../../../../../lib/errors'; @@ -52,8 +52,7 @@ export function logstashNodePipelinesRoute(server) { const { clusterUuid, logstashUuid } = req.params; const lsIndexPattern = prefixIndexPattern(config, INDEX_PATTERN_LOGSTASH, ccs); - const rawPipelines = await getLogstashPipelineIds(req, lsIndexPattern, config.get('xpack.monitoring.max_bucket_size')); - const pipelines = get(rawPipelines, 'aggregations.nested_context.pipelines.buckets', []).map(bucket => ({ id: bucket.key })); + const pipelines = await getLogstashPipelineIds(req, lsIndexPattern); // Manually apply pagination/sorting/filtering concerns @@ -76,8 +75,12 @@ export function logstashNodePipelinesRoute(server) { nodesCountMetric ]; + const metricOptions = { + pageOfPipelines, + }; + try { - const pipelineData = await getPipelines(req, lsIndexPattern, pipelineIds, metricSet); + const pipelineData = await getPipelines(req, lsIndexPattern, pipelineIds, metricSet, metricOptions); // We need to re-sort because the data from above comes back in random order const pipelinesResponse = sortByOrder( pipelineData,