Skip to content

Commit

Permalink
[Ingest Manager] rollover data stream when index template mappings ar…
Browse files Browse the repository at this point in the history
…e not compatible (#69180) (#70318)

* rollover data stream when index template mappings are not compatible

* update error messages

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
  • Loading branch information
neptunian and elasticmachine authored Jun 30, 2020
1 parent 2bc8b63 commit 1ea2f13
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 39 deletions.
2 changes: 1 addition & 1 deletion x-pack/plugins/ingest_manager/common/types/models/epm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ export interface IndexTemplate {
index_patterns: string[];
template: {
settings: any;
mappings: object;
mappings: any;
aliases: object;
};
data_stream: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,11 +330,15 @@ const getIndices = async (
template: TemplateRef
): Promise<CurrentIndex[] | undefined> => {
const { templateName, indexTemplate } = template;
const res = await callCluster('search', getIndexQuery(templateName));
const indices: any[] = res?.aggregations?.index.buckets;
if (indices) {
return indices.map((index) => ({
indexName: index.key,
// Until ES provides a way to update mappings of a data stream
// get the last index of the data stream, which is the current write index
const res = await callCluster('transport.request', {
method: 'GET',
path: `/_data_stream/${templateName}-*`,
});
if (res.length) {
return res.map((datastream: any) => ({
indexName: datastream.indices[datastream.indices.length - 1].index_name,
indexTemplate,
}));
}
Expand All @@ -359,18 +363,40 @@ const updateExistingIndex = async ({
indexTemplate: IndexTemplate;
}) => {
const { settings, mappings } = indexTemplate.template;

// for now, remove from object so as not to update stream or dataset properties of the index until type and name
// are added in https://github.com/elastic/kibana/issues/66551. namespace value we will continue
// to skip updating and assume the value in the index mapping is correct
delete mappings.properties.stream;
delete mappings.properties.dataset;

// get the dataset values from the index template to compose data stream name
const indexMappings = await getIndexMappings(indexName, callCluster);
const dataset = indexMappings[indexName].mappings.properties.dataset.properties;
if (!dataset.type.value || !dataset.name.value || !dataset.namespace.value)
throw new Error(`dataset values are missing from the index template ${indexName}`);
const dataStreamName = `${dataset.type.value}-${dataset.name.value}-${dataset.namespace.value}`;

// try to update the mappings first
// for now we assume updates are compatible
try {
await callCluster('indices.putMapping', {
index: indexName,
body: mappings,
});
// if update fails, rollover data stream
} catch (err) {
throw new Error('incompatible mappings update');
try {
const path = `/${dataStreamName}/_rollover`;
await callCluster('transport.request', {
method: 'POST',
path,
});
} catch (error) {
throw new Error(`cannot rollover data stream ${dataStreamName}`);
}
}
// update settings after mappings was successful to ensure
// pointing to theme new pipeline is safe
// pointing to the new pipeline is safe
// for now, only update the pipeline
if (!settings.index.default_pipeline) return;
try {
Expand All @@ -379,36 +405,17 @@ const updateExistingIndex = async ({
body: { index: { default_pipeline: settings.index.default_pipeline } },
});
} catch (err) {
throw new Error('incompatible settings update');
throw new Error(`could not update index template settings for ${indexName}`);
}
};

const getIndexQuery = (templateName: string) => ({
index: `${templateName}-*`,
size: 0,
body: {
query: {
bool: {
must: [
{
exists: {
field: 'dataset.namespace',
},
},
{
exists: {
field: 'dataset.name',
},
},
],
},
},
aggs: {
index: {
terms: {
field: '_index',
},
},
},
},
});
const getIndexMappings = async (indexName: string, callCluster: CallESAsCurrentUser) => {
try {
const indexMappings = await callCluster('indices.getMapping', {
index: indexName,
});
return indexMappings;
} catch (err) {
throw new Error(`could not get mapping from ${indexName}`);
}
};

0 comments on commit 1ea2f13

Please sign in to comment.