Skip to content

Commit

Permalink
[Fleet] Fix duplicate data streams being shown in UI (#89812)
Browse files Browse the repository at this point in the history
* Add API integration tests for data streams list, including one that is expected to fail due to reliance on number of backing indices

* Use ES data streams API as source of truth for list of data streams, and only query against backing indices afterwards

* Get package name from data stream meta info

* Increate retry timeout

* Move initial info requests inside Promise.all

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
  • Loading branch information
jen-huang and kibanamachine authored Feb 2, 2021
1 parent 7191c4d commit 19effe2
Show file tree
Hide file tree
Showing 6 changed files with 344 additions and 127 deletions.
2 changes: 1 addition & 1 deletion x-pack/plugins/fleet/common/types/models/data_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export interface DataStream {
type: string;
package: string;
package_version: string;
last_activity: string;
last_activity_ms: number;
size_in_bytes: number;
dashboards: Array<{
id: string;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,14 @@ export const DataStreamListPage: React.FunctionComponent<{}> = () => {
},
},
{
field: 'last_activity',
field: 'last_activity_ms',
sortable: true,
width: '25%',
dataType: 'date',
name: i18n.translate('xpack.fleet.dataStreamList.lastActivityColumnTitle', {
defaultMessage: 'Last activity',
}),
render: (date: DataStream['last_activity']) => {
render: (date: DataStream['last_activity_ms']) => {
try {
const formatter = fieldFormats.getInstance('date');
return formatter.convert(date);
Expand Down
276 changes: 153 additions & 123 deletions x-pack/plugins/fleet/server/routes/data_streams/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,157 +4,187 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { RequestHandler, SavedObjectsClientContract } from 'src/core/server';
import { keyBy, keys, merge } from 'lodash';
import { DataStream } from '../../types';
import { GetDataStreamsResponse, KibanaAssetType, KibanaSavedObjectType } from '../../../common';
import { getPackageSavedObjects, getKibanaSavedObject } from '../../services/epm/packages/get';
import { defaultIngestErrorHandler } from '../../errors';

const DATA_STREAM_INDEX_PATTERN = 'logs-*-*,metrics-*-*,traces-*-*';

interface ESDataStreamInfoResponse {
data_streams: Array<{
name: string;
timestamp_field: {
name: string;
};
indices: Array<{ index_name: string; index_uuid: string }>;
generation: number;
_meta?: {
package?: {
name: string;
};
managed_by?: string;
managed?: boolean;
[key: string]: any;
};
status: string;
template: string;
ilm_policy: string;
hidden: boolean;
}>;
}

interface ESDataStreamStatsResponse {
data_streams: Array<{
data_stream: string;
backing_indices: number;
store_size_bytes: number;
maximum_timestamp: number;
}>;
}

export const getListHandler: RequestHandler = async (context, request, response) => {
const callCluster = context.core.elasticsearch.legacy.client.callAsCurrentUser;
const body: GetDataStreamsResponse = {
data_streams: [],
};

try {
// Get stats (size on disk) of all potentially matching indices
const { indices: indexStats } = await callCluster('indices.stats', {
index: DATA_STREAM_INDEX_PATTERN,
metric: ['store'],
});
// Get matching data streams, their stats, and package SOs
const [
{ data_streams: dataStreamsInfo },
{ data_streams: dataStreamStats },
packageSavedObjects,
] = await Promise.all([
callCluster('transport.request', {
method: 'GET',
path: `/_data_stream/${DATA_STREAM_INDEX_PATTERN}`,
}) as Promise<ESDataStreamInfoResponse>,
callCluster('transport.request', {
method: 'GET',
path: `/_data_stream/${DATA_STREAM_INDEX_PATTERN}/_stats`,
}) as Promise<ESDataStreamStatsResponse>,
getPackageSavedObjects(context.core.savedObjects.client),
]);
const dataStreamsInfoByName = keyBy(dataStreamsInfo, 'name');
const dataStreamsStatsByName = keyBy(dataStreamStats, 'data_stream');

// Combine data stream info
const dataStreams = merge(dataStreamsInfoByName, dataStreamsStatsByName);
const dataStreamNames = keys(dataStreams);

// Map package SOs
const packageSavedObjectsByName = keyBy(packageSavedObjects.saved_objects, 'id');
const packageMetadata: any = {};

// Query additional information for each data stream
const dataStreamPromises = dataStreamNames.map(async (dataStreamName) => {
const dataStream = dataStreams[dataStreamName];
const dataStreamResponse: DataStream = {
index: dataStreamName,
dataset: '',
namespace: '',
type: '',
package: dataStream._meta?.package?.name || '',
package_version: '',
last_activity_ms: dataStream.maximum_timestamp,
size_in_bytes: dataStream.store_size_bytes,
dashboards: [],
};

// Get all matching indices and info about each
// This returns the top 100,000 indices (as buckets) by last activity
const { aggregations } = await callCluster('search', {
index: DATA_STREAM_INDEX_PATTERN,
body: {
size: 0,
query: {
bool: {
must: [
{
exists: {
field: 'data_stream.namespace',
// Query backing indices to extract data stream dataset, namespace, and type values
const {
aggregations: { dataset, namespace, type },
} = await callCluster('search', {
index: dataStream.indices.map((index) => index.index_name),
body: {
size: 0,
query: {
bool: {
must: [
{
exists: {
field: 'data_stream.namespace',
},
},
},
{
exists: {
field: 'data_stream.dataset',
{
exists: {
field: 'data_stream.dataset',
},
},
},
],
],
},
},
},
aggs: {
index: {
terms: {
field: '_index',
size: 100000,
order: {
last_activity: 'desc',
aggs: {
dataset: {
terms: {
field: 'data_stream.dataset',
size: 1,
},
},
aggs: {
dataset: {
terms: {
field: 'data_stream.dataset',
size: 1,
},
},
namespace: {
terms: {
field: 'data_stream.namespace',
size: 1,
},
namespace: {
terms: {
field: 'data_stream.namespace',
size: 1,
},
type: {
terms: {
field: 'data_stream.type',
size: 1,
},
},
last_activity: {
max: {
field: '@timestamp',
},
},
type: {
terms: {
field: 'data_stream.type',
size: 1,
},
},
},
},
},
});

const body: GetDataStreamsResponse = {
data_streams: [],
};

if (!(aggregations && aggregations.index && aggregations.index.buckets)) {
return response.ok({
body,
});
}

const {
index: { buckets: indexResults },
} = aggregations;

const packageSavedObjects = await getPackageSavedObjects(context.core.savedObjects.client);
const packageMetadata: any = {};

const dataStreamsPromises = (indexResults as any[]).map(async (result) => {
const {
key: indexName,
dataset: { buckets: datasetBuckets },
namespace: { buckets: namespaceBuckets },
type: { buckets: typeBuckets },
last_activity: { value_as_string: lastActivity },
} = result;

// We don't have a reliable way to associate index with package ID, so
// this is a hack to extract the package ID from the first part of the dataset name
// with fallback to extraction from index name
const pkg = datasetBuckets.length
? datasetBuckets[0].key.split('.')[0]
: indexName.split('-')[1].split('.')[0];
const pkgSavedObject = packageSavedObjects.saved_objects.filter((p) => p.id === pkg);

// if
// - the datastream is associated with a package
// - and the package has been installed through EPM
// - and we didn't pick the metadata in an earlier iteration of this map()
if (pkg !== '' && pkgSavedObject.length > 0 && !packageMetadata[pkg]) {
// then pick the dashboards from the package saved object
const dashboards =
pkgSavedObject[0].attributes?.installed_kibana?.filter(
(o) => o.type === KibanaSavedObjectType.dashboard
) || [];
// and then pick the human-readable titles from the dashboard saved objects
const enhancedDashboards = await getEnhancedDashboards(
context.core.savedObjects.client,
dashboards
);

packageMetadata[pkg] = {
version: pkgSavedObject[0].attributes?.version || '',
dashboards: enhancedDashboards,
};
// Set values from backing indices query
dataStreamResponse.dataset = dataset.buckets[0]?.key || '';
dataStreamResponse.namespace = namespace.buckets[0]?.key || '';
dataStreamResponse.type = type.buckets[0]?.key || '';

// Find package saved object
const pkgName = dataStreamResponse.package;
const pkgSavedObject = pkgName ? packageSavedObjectsByName[pkgName] : null;

if (pkgSavedObject) {
// if
// - the data stream is associated with a package
// - and the package has been installed through EPM
// - and we didn't pick the metadata in an earlier iteration of this map()
if (!packageMetadata[pkgName]) {
// then pick the dashboards from the package saved object
const dashboards =
pkgSavedObject.attributes?.installed_kibana?.filter(
(o) => o.type === KibanaSavedObjectType.dashboard
) || [];
// and then pick the human-readable titles from the dashboard saved objects
const enhancedDashboards = await getEnhancedDashboards(
context.core.savedObjects.client,
dashboards
);

packageMetadata[pkgName] = {
version: pkgSavedObject.attributes?.version || '',
dashboards: enhancedDashboards,
};
}

// Set values from package information
dataStreamResponse.package = pkgName;
dataStreamResponse.package_version = packageMetadata[pkgName].version;
dataStreamResponse.dashboards = packageMetadata[pkgName].dashboards;
}

return {
index: indexName,
dataset: datasetBuckets.length ? datasetBuckets[0].key : '',
namespace: namespaceBuckets.length ? namespaceBuckets[0].key : '',
type: typeBuckets.length ? typeBuckets[0].key : '',
package: pkgSavedObject.length ? pkg : '',
package_version: packageMetadata[pkg] ? packageMetadata[pkg].version : '',
last_activity: lastActivity,
size_in_bytes: indexStats[indexName] ? indexStats[indexName].total.store.size_in_bytes : 0,
dashboards: packageMetadata[pkg] ? packageMetadata[pkg].dashboards : [],
};
return dataStreamResponse;
});

const dataStreams: DataStream[] = await Promise.all(dataStreamsPromises);

body.data_streams = dataStreams;

// Return final data streams objects sorted by last activity, decending
// After filtering out data streams that are missing dataset/namespace/type fields
body.data_streams = (await Promise.all(dataStreamPromises))
.filter(({ dataset, namespace, type }) => dataset && namespace && type)
.sort((a, b) => b.last_activity_ms - a.last_activity_ms);
return response.ok({
body,
});
Expand Down
11 changes: 11 additions & 0 deletions x-pack/test/fleet_api_integration/apis/data_streams/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

export default function loadTests({ loadTestFile }) {
describe('Data Stream Endpoints', () => {
loadTestFile(require.resolve('./list'));
});
}
Loading

0 comments on commit 19effe2

Please sign in to comment.