Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ingest Manager] rollover data stream when index template mappings are not compatible #69180

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion x-pack/plugins/ingest_manager/common/types/models/epm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ export interface IndexTemplate {
index_patterns: string[];
template: {
settings: any;
mappings: object;
mappings: any;
aliases: object;
};
data_stream: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,11 +330,15 @@ const getIndices = async (
template: TemplateRef
): Promise<CurrentIndex[] | undefined> => {
const { templateName, indexTemplate } = template;
const res = await callCluster('search', getIndexQuery(templateName));
const indices: any[] = res?.aggregations?.index.buckets;
if (indices) {
return indices.map((index) => ({
indexName: index.key,
// Until ES provides a way to update mappings of a data stream
// get the last index of the data stream, which is the current write index
const res = await callCluster('transport.request', {
method: 'GET',
path: `/_data_stream/${templateName}-*`,
});
if (res.length) {
return res.map((datastream: any) => ({
indexName: datastream.indices[datastream.indices.length - 1].index_name,
indexTemplate,
}));
}
Expand All @@ -359,18 +363,40 @@ const updateExistingIndex = async ({
indexTemplate: IndexTemplate;
}) => {
const { settings, mappings } = indexTemplate.template;

// for now, remove from object so as not to update stream or dataset properties of the index until type and name
// are added in https://github.com/elastic/kibana/issues/66551. namespace value we will continue
// to skip updating and assume the value in the index mapping is correct
delete mappings.properties.stream;
delete mappings.properties.dataset;

// get the dataset values from the index template to compose data stream name
const indexMappings = await getIndexMappings(indexName, callCluster);
const dataset = indexMappings[indexName].mappings.properties.dataset.properties;
if (!dataset.type.value || !dataset.name.value || !dataset.namespace.value)
throw new Error(`dataset values are missing from the index template ${indexName}`);
const dataStreamName = `${dataset.type.value}-${dataset.name.value}-${dataset.namespace.value}`;

// try to update the mappings first
// for now we assume updates are compatible
try {
await callCluster('indices.putMapping', {
index: indexName,
body: mappings,
});
// if update fails, rollover data stream
} catch (err) {
throw new Error('incompatible mappings update');
try {
const path = `/${dataStreamName}/_rollover`;
await callCluster('transport.request', {
method: 'POST',
path,
});
} catch (error) {
throw new Error(`cannot rollover data stream ${dataStreamName}`);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about adding the error message that comes from elasticsearch to the new error message? This can be helpful for troubleshooting.

}
}
// update settings after mappings was successful to ensure
// pointing to theme new pipeline is safe
// pointing to the new pipeline is safe
// for now, only update the pipeline
if (!settings.index.default_pipeline) return;
try {
Expand All @@ -379,36 +405,17 @@ const updateExistingIndex = async ({
body: { index: { default_pipeline: settings.index.default_pipeline } },
});
} catch (err) {
throw new Error('incompatible settings update');
throw new Error(`could not update index template settings for ${indexName}`);
}
};

const getIndexQuery = (templateName: string) => ({
index: `${templateName}-*`,
size: 0,
body: {
query: {
bool: {
must: [
{
exists: {
field: 'dataset.namespace',
},
},
{
exists: {
field: 'dataset.name',
},
},
],
},
},
aggs: {
index: {
terms: {
field: '_index',
},
},
},
},
});
const getIndexMappings = async (indexName: string, callCluster: CallESAsCurrentUser) => {
try {
const indexMappings = await callCluster('indices.getMapping', {
index: indexName,
});
return indexMappings;
} catch (err) {
throw new Error(`could not get mapping from ${indexName}`);
}
};