Skip to content

Commit

Permalink
[Fleet] Update data streams mappings directly instead of against back…
Browse files Browse the repository at this point in the history
…ing indices (#89660)

* Update data streams mappings directly instead of querying for backing indices, update integration tests to test with multiple namespaces

* Add flag to only update mappings of the current write index

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
  • Loading branch information
jen-huang and kibanamachine authored Feb 1, 2021
1 parent e4c344a commit cb16a5c
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import {
TemplateRef,
IndexTemplate,
IndexTemplateMappings,
DataType,
} from '../../../../types';
import { getRegistryDataStreamAssetBaseName } from '../index';

Expand All @@ -26,8 +25,8 @@ interface MultiFields {
export interface IndexTemplateMapping {
[key: string]: any;
}
export interface CurrentIndex {
indexName: string;
export interface CurrentDataStream {
dataStreamName: string;
indexTemplate: IndexTemplate;
}
const DEFAULT_SCALING_FACTOR = 1000;
Expand Down Expand Up @@ -348,60 +347,60 @@ export const updateCurrentWriteIndices = async (
): Promise<void> => {
if (!templates.length) return;

const allIndices = await queryIndicesFromTemplates(callCluster, templates);
const allIndices = await queryDataStreamsFromTemplates(callCluster, templates);
if (!allIndices.length) return;
return updateAllIndices(allIndices, callCluster);
return updateAllDataStreams(allIndices, callCluster);
};

function isCurrentIndex(item: CurrentIndex[] | undefined): item is CurrentIndex[] {
function isCurrentDataStream(item: CurrentDataStream[] | undefined): item is CurrentDataStream[] {
return item !== undefined;
}

const queryIndicesFromTemplates = async (
const queryDataStreamsFromTemplates = async (
callCluster: CallESAsCurrentUser,
templates: TemplateRef[]
): Promise<CurrentIndex[]> => {
const indexPromises = templates.map((template) => {
return getIndices(callCluster, template);
): Promise<CurrentDataStream[]> => {
const dataStreamPromises = templates.map((template) => {
return getDataStreams(callCluster, template);
});
const indexObjects = await Promise.all(indexPromises);
return indexObjects.filter(isCurrentIndex).flat();
const dataStreamObjects = await Promise.all(dataStreamPromises);
return dataStreamObjects.filter(isCurrentDataStream).flat();
};

const getIndices = async (
const getDataStreams = async (
callCluster: CallESAsCurrentUser,
template: TemplateRef
): Promise<CurrentIndex[] | undefined> => {
): Promise<CurrentDataStream[] | undefined> => {
const { templateName, indexTemplate } = template;
// Until ES provides a way to update mappings of a data stream
// get the last index of the data stream, which is the current write index
const res = await callCluster('transport.request', {
method: 'GET',
path: `/_data_stream/${templateName}-*`,
});
const dataStreams = res.data_streams;
if (!dataStreams.length) return;
return dataStreams.map((dataStream: any) => ({
indexName: dataStream.indices[dataStream.indices.length - 1].index_name,
dataStreamName: dataStream.name,
indexTemplate,
}));
};

const updateAllIndices = async (
indexNameWithTemplates: CurrentIndex[],
const updateAllDataStreams = async (
indexNameWithTemplates: CurrentDataStream[],
callCluster: CallESAsCurrentUser
): Promise<void> => {
const updateIndexPromises = indexNameWithTemplates.map(({ indexName, indexTemplate }) => {
return updateExistingIndex({ indexName, callCluster, indexTemplate });
});
await Promise.all(updateIndexPromises);
const updatedataStreamPromises = indexNameWithTemplates.map(
({ dataStreamName, indexTemplate }) => {
return updateExistingDataStream({ dataStreamName, callCluster, indexTemplate });
}
);
await Promise.all(updatedataStreamPromises);
};
const updateExistingIndex = async ({
indexName,
const updateExistingDataStream = async ({
dataStreamName,
callCluster,
indexTemplate,
}: {
indexName: string;
dataStreamName: string;
callCluster: CallESAsCurrentUser;
indexTemplate: IndexTemplate;
}) => {
Expand All @@ -416,53 +415,13 @@ const updateExistingIndex = async ({
// try to update the mappings first
try {
await callCluster('indices.putMapping', {
index: indexName,
index: dataStreamName,
body: mappings,
write_index_only: true,
});
// if update fails, rollover data stream
} catch (err) {
try {
// get the data_stream values to compose datastream name
const searchDataStreamFieldsResponse = await callCluster('search', {
index: indexTemplate.index_patterns[0],
body: {
size: 1,
_source: ['data_stream.namespace', 'data_stream.type', 'data_stream.dataset'],
query: {
bool: {
filter: [
{
exists: {
field: 'data_stream.type',
},
},
{
exists: {
field: 'data_stream.dataset',
},
},
{
exists: {
field: 'data_stream.namespace',
},
},
],
},
},
},
});
if (searchDataStreamFieldsResponse.hits.total.value === 0)
throw new Error('data_stream fields are missing from datastream indices');
const {
dataset,
namespace,
type,
}: {
dataset: string;
namespace: string;
type: DataType;
} = searchDataStreamFieldsResponse.hits.hits[0]._source.data_stream;
const dataStreamName = `${type}-${dataset}-${namespace}`;
const path = `/${dataStreamName}/_rollover`;
await callCluster('transport.request', {
method: 'POST',
Expand All @@ -478,10 +437,10 @@ const updateExistingIndex = async ({
if (!settings.index.default_pipeline) return;
try {
await callCluster('indices.putSettings', {
index: indexName,
index: dataStreamName,
body: { index: { default_pipeline: settings.index.default_pipeline } },
});
} catch (err) {
throw new Error(`could not update index template settings for ${indexName}`);
throw new Error(`could not update index template settings for ${dataStreamName}`);
}
};
148 changes: 83 additions & 65 deletions x-pack/test/fleet_api_integration/apis/epm/data_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,14 @@ export default function (providerContext: FtrProviderContext) {
const { getService } = providerContext;
const supertest = getService('supertest');
const es = getService('es');
const dockerServers = getService('dockerServers');
const server = dockerServers.get('registry');
const pkgName = 'datastreams';
const pkgVersion = '0.1.0';
const pkgUpdateVersion = '0.2.0';
const pkgKey = `${pkgName}-${pkgVersion}`;
const pkgUpdateKey = `${pkgName}-${pkgUpdateVersion}`;
const logsTemplateName = `logs-${pkgName}.test_logs`;
const metricsTemplateName = `metrics-${pkgName}.test_metrics`;
const namespaces = ['default', 'foo', 'bar'];

const uninstallPackage = async (pkg: string) => {
await supertest.delete(`/api/fleet/epm/packages/${pkg}`).set('kbn-xsrf', 'xxxx');
Expand All @@ -35,86 +34,105 @@ export default function (providerContext: FtrProviderContext) {

describe('datastreams', async () => {
skipIfNoDockerRegistry(providerContext);

beforeEach(async () => {
await installPackage(pkgKey);
await es.transport.request({
method: 'POST',
path: `/${logsTemplateName}-default/_doc`,
body: {
'@timestamp': '2015-01-01',
logs_test_name: 'test',
data_stream: {
dataset: `${pkgName}.test_logs`,
namespace: 'default',
type: 'logs',
},
},
});
await es.transport.request({
method: 'POST',
path: `/${metricsTemplateName}-default/_doc`,
body: {
'@timestamp': '2015-01-01',
logs_test_name: 'test',
data_stream: {
dataset: `${pkgName}.test_metrics`,
namespace: 'default',
type: 'metrics',
},
},
});
await Promise.all(
namespaces.map(async (namespace) => {
const createLogsRequest = es.transport.request({
method: 'POST',
path: `/${logsTemplateName}-${namespace}/_doc`,
body: {
'@timestamp': '2015-01-01',
logs_test_name: 'test',
data_stream: {
dataset: `${pkgName}.test_logs`,
namespace,
type: 'logs',
},
},
});
const createMetricsRequest = es.transport.request({
method: 'POST',
path: `/${metricsTemplateName}-${namespace}/_doc`,
body: {
'@timestamp': '2015-01-01',
logs_test_name: 'test',
data_stream: {
dataset: `${pkgName}.test_metrics`,
namespace,
type: 'metrics',
},
},
});
return Promise.all([createLogsRequest, createMetricsRequest]);
})
);
});

afterEach(async () => {
if (!server.enabled) return;
await es.transport.request({
method: 'DELETE',
path: `/_data_stream/${logsTemplateName}-default`,
});
await es.transport.request({
method: 'DELETE',
path: `/_data_stream/${metricsTemplateName}-default`,
});
await Promise.all(
namespaces.map(async (namespace) => {
const deleteLogsRequest = es.transport.request({
method: 'DELETE',
path: `/_data_stream/${logsTemplateName}-${namespace}`,
});
const deleteMetricsRequest = es.transport.request({
method: 'DELETE',
path: `/_data_stream/${metricsTemplateName}-${namespace}`,
});
return Promise.all([deleteLogsRequest, deleteMetricsRequest]);
})
);
await uninstallPackage(pkgKey);
await uninstallPackage(pkgUpdateKey);
});

it('should list the logs and metrics datastream', async function () {
const resLogsDatastream = await es.transport.request({
method: 'GET',
path: `/_data_stream/${logsTemplateName}-default`,
});
const resMetricsDatastream = await es.transport.request({
method: 'GET',
path: `/_data_stream/${metricsTemplateName}-default`,
namespaces.forEach(async (namespace) => {
const resLogsDatastream = await es.transport.request({
method: 'GET',
path: `/_data_stream/${logsTemplateName}-${namespace}`,
});
const resMetricsDatastream = await es.transport.request({
method: 'GET',
path: `/_data_stream/${metricsTemplateName}-${namespace}`,
});
expect(resLogsDatastream.body.data_streams.length).equal(1);
expect(resLogsDatastream.body.data_streams[0].indices.length).equal(1);
expect(resMetricsDatastream.body.data_streams.length).equal(1);
expect(resMetricsDatastream.body.data_streams[0].indices.length).equal(1);
});
expect(resLogsDatastream.body.data_streams.length).equal(1);
expect(resLogsDatastream.body.data_streams[0].indices.length).equal(1);
expect(resMetricsDatastream.body.data_streams.length).equal(1);
expect(resMetricsDatastream.body.data_streams[0].indices.length).equal(1);
});

it('after update, it should have rolled over logs datastream because mappings are not compatible and not metrics', async function () {
await installPackage(pkgUpdateKey);
const resLogsDatastream = await es.transport.request({
method: 'GET',
path: `/_data_stream/${logsTemplateName}-default`,
});
const resMetricsDatastream = await es.transport.request({
method: 'GET',
path: `/_data_stream/${metricsTemplateName}-default`,
namespaces.forEach(async (namespace) => {
const resLogsDatastream = await es.transport.request({
method: 'GET',
path: `/_data_stream/${logsTemplateName}-${namespace}`,
});
const resMetricsDatastream = await es.transport.request({
method: 'GET',
path: `/_data_stream/${metricsTemplateName}-${namespace}`,
});
expect(resLogsDatastream.body.data_streams[0].indices.length).equal(2);
expect(resMetricsDatastream.body.data_streams[0].indices.length).equal(1);
});
expect(resLogsDatastream.body.data_streams[0].indices.length).equal(2);
expect(resMetricsDatastream.body.data_streams[0].indices.length).equal(1);
});

it('should be able to upgrade a package after a rollover', async function () {
await es.transport.request({
method: 'POST',
path: `/${logsTemplateName}-default/_rollover`,
});
const resLogsDatastream = await es.transport.request({
method: 'GET',
path: `/_data_stream/${logsTemplateName}-default`,
namespaces.forEach(async (namespace) => {
await es.transport.request({
method: 'POST',
path: `/${logsTemplateName}-${namespace}/_rollover`,
});
const resLogsDatastream = await es.transport.request({
method: 'GET',
path: `/_data_stream/${logsTemplateName}-${namespace}`,
});
expect(resLogsDatastream.body.data_streams[0].indices.length).equal(2);
});
expect(resLogsDatastream.body.data_streams[0].indices.length).equal(2);
await installPackage(pkgUpdateKey);
});
});
Expand Down

0 comments on commit cb16a5c

Please sign in to comment.