diff --git a/packages/kbn-es-archiver/src/actions/save.ts b/packages/kbn-es-archiver/src/actions/save.ts index 16f0cbc3c18462..9fcbe45946eb77 100644 --- a/packages/kbn-es-archiver/src/actions/save.ts +++ b/packages/kbn-es-archiver/src/actions/save.ts @@ -52,7 +52,7 @@ export async function saveAction({ // export and save the matching indices to mappings.json createPromiseFromStreams([ createListStream(indices), - createGenerateIndexRecordsStream({ client, stats, keepIndexNames }), + createGenerateIndexRecordsStream({ client, stats, keepIndexNames, log }), ...createFormatArchiveStreams(), createWriteStream(resolve(outputDir, 'mappings.json')), ] as [Readable, ...Writable[]]), diff --git a/packages/kbn-es-archiver/src/actions/unload.ts b/packages/kbn-es-archiver/src/actions/unload.ts index 2d4b16d7186894..e564bcbb1a703e 100644 --- a/packages/kbn-es-archiver/src/actions/unload.ts +++ b/packages/kbn-es-archiver/src/actions/unload.ts @@ -45,7 +45,7 @@ export async function unloadAction({ await createPromiseFromStreams([ createReadStream(resolve(inputDir, filename)) as Readable, ...createParseArchiveStreams({ gzip: isGzip(filename) }), - createFilterRecordsStream('index'), + createFilterRecordsStream((record) => ['index', 'data_stream'].includes(record.type)), createDeleteIndexStream(client, stats, log), ] as [Readable, ...Writable[]]); } diff --git a/packages/kbn-es-archiver/src/lib/docs/generate_doc_records_stream.test.ts b/packages/kbn-es-archiver/src/lib/docs/generate_doc_records_stream.test.ts index e102ac50c3876b..386d6d4a088ce2 100644 --- a/packages/kbn-es-archiver/src/lib/docs/generate_doc_records_stream.test.ts +++ b/packages/kbn-es-archiver/src/lib/docs/generate_doc_records_stream.test.ts @@ -36,16 +36,29 @@ interface SearchResponses { }>; } -function createMockClient(responses: SearchResponses) { +function createMockClient(responses: SearchResponses, hasDataStreams = false) { // TODO: replace with proper mocked client const client: any = { helpers: { scrollSearch: jest.fn(function* ({ index }) { + if (hasDataStreams) { + index = `.ds-${index}`; + } + while (responses[index] && responses[index].length) { yield responses[index].shift()!; } }), }, + indices: { + get: jest.fn(async ({ index }) => { + return { [index]: { data_stream: hasDataStreams && index.substring(4) } }; + }), + getDataStream: jest.fn(async ({ name }) => { + if (!hasDataStreams) return { data_streams: [] }; + return { data_streams: [{ name }] }; + }), + }, }; return client; } @@ -217,6 +230,35 @@ describe('esArchiver: createGenerateDocRecordsStream()', () => { `); }); + it('supports data streams', async () => { + const hits = [ + { _index: '.ds-foo-datastream', _id: '0', _source: {} }, + { _index: '.ds-foo-datastream', _id: '1', _source: {} }, + ]; + const responses = { + '.ds-foo-datastream': [{ body: { hits: { hits, total: hits.length } } }], + }; + const client = createMockClient(responses, true); + + const stats = createStats('test', log); + const progress = new Progress(); + + const results = await createPromiseFromStreams([ + createListStream(['foo-datastream']), + createGenerateDocRecordsStream({ + client, + stats, + progress, + }), + createMapStream((record: any) => { + return `${record.value.data_stream}:${record.value.id}`; + }), + createConcatStream([]), + ]); + + expect(results).toEqual(['foo-datastream:0', 'foo-datastream:1']); + }); + describe('keepIndexNames', () => { it('changes .kibana* index names if keepIndexNames is not enabled', async () => { const hits = [{ _index: '.kibana_7.16.0_001', _id: '0', _source: {} }]; diff --git a/packages/kbn-es-archiver/src/lib/docs/generate_doc_records_stream.ts b/packages/kbn-es-archiver/src/lib/docs/generate_doc_records_stream.ts index 40907bd0af2386..6e3310a7347e73 100644 --- a/packages/kbn-es-archiver/src/lib/docs/generate_doc_records_stream.ts +++ b/packages/kbn-es-archiver/src/lib/docs/generate_doc_records_stream.ts @@ -47,6 +47,10 @@ export function createGenerateDocRecordsStream({ } ); + const hasDatastreams = + (await client.indices.getDataStream({ name: index })).data_streams.length > 0; + const indexToDatastream = new Map(); + let remainingHits: number | null = null; for await (const resp of interator) { @@ -57,7 +61,17 @@ export function createGenerateDocRecordsStream({ for (const hit of resp.body.hits.hits) { remainingHits -= 1; - stats.archivedDoc(hit._index); + + if (hasDatastreams && !indexToDatastream.has(hit._index)) { + const { + [hit._index]: { data_stream: dataStream }, + } = await client.indices.get({ index: hit._index, filter_path: ['*.data_stream'] }); + indexToDatastream.set(hit._index, dataStream); + } + + const dataStream = indexToDatastream.get(hit._index); + stats.archivedDoc(dataStream || hit._index); + this.push({ type: 'doc', value: { @@ -65,6 +79,7 @@ export function createGenerateDocRecordsStream({ // when it is loaded it can skip migration, if possible index: hit._index.startsWith('.kibana') && !keepIndexNames ? '.kibana_1' : hit._index, + data_stream: dataStream, id: hit._id, source: hit._source, }, diff --git a/packages/kbn-es-archiver/src/lib/docs/index_doc_records_stream.test.ts b/packages/kbn-es-archiver/src/lib/docs/index_doc_records_stream.test.ts index 5dc9b4b7bd8ddb..c1bb94ee134989 100644 --- a/packages/kbn-es-archiver/src/lib/docs/index_doc_records_stream.test.ts +++ b/packages/kbn-es-archiver/src/lib/docs/index_doc_records_stream.test.ts @@ -243,6 +243,55 @@ describe('bulk helper onDocument param', () => { createIndexDocRecordsStream(client as any, stats, progress, true), ]); }); + + it('returns create ops for data stream documents', async () => { + const records = [ + { + type: 'doc', + value: { + index: '.ds-foo-ds', + data_stream: 'foo-ds', + id: '0', + source: { + hello: 'world', + }, + }, + }, + { + type: 'doc', + value: { + index: '.ds-foo-ds', + data_stream: 'foo-ds', + id: '1', + source: { + hello: 'world', + }, + }, + }, + ]; + expect.assertions(records.length); + + const client = new MockClient(); + client.helpers.bulk.mockImplementation(async ({ datasource, onDocument }) => { + for (const d of datasource) { + const op = onDocument(d); + expect(op).toEqual({ + create: { + _index: 'foo-ds', + _id: expect.stringMatching(/^\d$/), + }, + }); + } + }); + + const stats = createStats('test', log); + const progress = new Progress(); + + await createPromiseFromStreams([ + createListStream(records), + createIndexDocRecordsStream(client as any, stats, progress), + ]); + }); }); describe('bulk helper onDrop param', () => { diff --git a/packages/kbn-es-archiver/src/lib/docs/index_doc_records_stream.ts b/packages/kbn-es-archiver/src/lib/docs/index_doc_records_stream.ts index 749bfd08723534..40e1c1932aeeec 100644 --- a/packages/kbn-es-archiver/src/lib/docs/index_doc_records_stream.ts +++ b/packages/kbn-es-archiver/src/lib/docs/index_doc_records_stream.ts @@ -13,6 +13,11 @@ import { Stats } from '../stats'; import { Progress } from '../progress'; import { ES_CLIENT_HEADERS } from '../../client_headers'; +enum BulkOperation { + Create = 'create', + Index = 'index', +} + export function createIndexDocRecordsStream( client: Client, stats: Stats, @@ -20,7 +25,7 @@ export function createIndexDocRecordsStream( useCreate: boolean = false ) { async function indexDocs(docs: any[]) { - const operation = useCreate === true ? 'create' : 'index'; + const operation = useCreate === true ? BulkOperation.Create : BulkOperation.Index; const ops = new WeakMap(); const errors: string[] = []; @@ -29,9 +34,11 @@ export function createIndexDocRecordsStream( retries: 5, datasource: docs.map((doc) => { const body = doc.source; + const op = doc.data_stream ? BulkOperation.Create : operation; + const index = doc.data_stream || doc.index; ops.set(body, { - [operation]: { - _index: doc.index, + [op]: { + _index: index, _id: doc.id, }, }); @@ -56,7 +63,7 @@ export function createIndexDocRecordsStream( } for (const doc of docs) { - stats.indexedDoc(doc.index); + stats.indexedDoc(doc.data_stream || doc.index); } } diff --git a/packages/kbn-es-archiver/src/lib/index.ts b/packages/kbn-es-archiver/src/lib/index.ts index ee37591e1f2c30..8a857fb24002a7 100644 --- a/packages/kbn-es-archiver/src/lib/index.ts +++ b/packages/kbn-es-archiver/src/lib/index.ts @@ -33,3 +33,5 @@ export { export { readDirectory } from './directory'; export { Progress } from './progress'; + +export { getIndexTemplate } from './index_template'; diff --git a/packages/kbn-es-archiver/src/lib/index_template.test.ts b/packages/kbn-es-archiver/src/lib/index_template.test.ts new file mode 100644 index 00000000000000..b8f5330663ee15 --- /dev/null +++ b/packages/kbn-es-archiver/src/lib/index_template.test.ts @@ -0,0 +1,105 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +import type { Client } from '@elastic/elasticsearch'; + +import sinon from 'sinon'; +import { getIndexTemplate } from './index_template'; + +describe('esArchiver: index template', () => { + describe('getIndexTemplate', () => { + it('returns the index template', async () => { + const client = { + indices: { + getIndexTemplate: sinon.stub().resolves({ + index_templates: [ + { + index_template: { + index_patterns: ['pattern-*'], + template: { + mappings: { properties: { foo: { type: 'keyword' } } }, + }, + priority: 500, + composed_of: [], + data_stream: { hidden: false }, + }, + }, + ], + }), + }, + } as unknown as Client; + + const template = await getIndexTemplate(client, 'template-foo'); + + expect(template).toEqual({ + name: 'template-foo', + index_patterns: ['pattern-*'], + template: { + mappings: { properties: { foo: { type: 'keyword' } } }, + }, + priority: 500, + data_stream: { hidden: false }, + }); + }); + + it('resolves component templates', async () => { + const client = { + indices: { + getIndexTemplate: sinon.stub().resolves({ + index_templates: [ + { + index_template: { + index_patterns: ['pattern-*'], + composed_of: ['the-settings', 'the-mappings'], + }, + }, + ], + }), + }, + cluster: { + getComponentTemplate: sinon + .stub() + .onFirstCall() + .resolves({ + component_templates: [ + { + component_template: { + template: { + aliases: { 'foo-alias': {} }, + }, + }, + }, + ], + }) + .onSecondCall() + .resolves({ + component_templates: [ + { + component_template: { + template: { + mappings: { properties: { foo: { type: 'keyword' } } }, + }, + }, + }, + ], + }), + }, + } as unknown as Client; + + const template = await getIndexTemplate(client, 'template-foo'); + + expect(template).toEqual({ + name: 'template-foo', + index_patterns: ['pattern-*'], + template: { + aliases: { 'foo-alias': {} }, + mappings: { properties: { foo: { type: 'keyword' } } }, + }, + }); + }); + }); +}); diff --git a/packages/kbn-es-archiver/src/lib/index_template.ts b/packages/kbn-es-archiver/src/lib/index_template.ts new file mode 100644 index 00000000000000..9d67add9757dbe --- /dev/null +++ b/packages/kbn-es-archiver/src/lib/index_template.ts @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { merge } from 'lodash'; +import type { Client } from '@elastic/elasticsearch'; + +import { ES_CLIENT_HEADERS } from '../client_headers'; + +export const getIndexTemplate = async (client: Client, templateName: string) => { + const { index_templates: indexTemplates } = await client.indices.getIndexTemplate( + { name: templateName }, + { headers: ES_CLIENT_HEADERS } + ); + const { + index_template: { template, composed_of: composedOf = [], ...indexTemplate }, + } = indexTemplates[0]; + + const components = await Promise.all( + composedOf.map(async (component) => { + const { component_templates: componentTemplates } = await client.cluster.getComponentTemplate( + { name: component } + ); + return componentTemplates[0].component_template.template; + }) + ); + + return { + ...indexTemplate, + name: templateName, + template: merge(template, ...components), + }; +}; diff --git a/packages/kbn-es-archiver/src/lib/indices/__mocks__/stubs.ts b/packages/kbn-es-archiver/src/lib/indices/__mocks__/stubs.ts index c60c9201001743..1bfbc80f52a19f 100644 --- a/packages/kbn-es-archiver/src/lib/indices/__mocks__/stubs.ts +++ b/packages/kbn-es-archiver/src/lib/indices/__mocks__/stubs.ts @@ -19,7 +19,9 @@ export const createStubStats = (): StubStats => ({ createdIndex: sinon.stub(), createdAliases: sinon.stub(), + createdDataStream: sinon.stub(), deletedIndex: sinon.stub(), + deletedDataStream: sinon.stub(), skippedIndex: sinon.stub(), archivedIndex: sinon.stub(), getTestSummary() { @@ -47,6 +49,11 @@ export const createStubIndexRecord = (index: string, aliases = {}) => ({ value: { index, aliases }, }); +export const createStubDataStreamRecord = (dataStream: string, template: string) => ({ + type: 'data_stream', + value: { data_stream: dataStream, template: { name: template } }, +}); + export const createStubDocRecord = (index: string, id: number) => ({ type: 'doc', value: { index, id }, @@ -140,5 +147,10 @@ export const createStubClient = ( exists: sinon.spy(async () => { throw new Error('Do not use indices.exists(). React to errors instead.'); }), + + createDataStream: sinon.spy(async ({ name }) => {}), + deleteDataStream: sinon.spy(async ({ name }) => {}), + putIndexTemplate: sinon.spy(async ({ name }) => {}), + deleteIndexTemplate: sinon.spy(async ({ name }) => {}), }, } as any); diff --git a/packages/kbn-es-archiver/src/lib/indices/create_index_stream.test.ts b/packages/kbn-es-archiver/src/lib/indices/create_index_stream.test.ts index 615555b405e445..15efa539217437 100644 --- a/packages/kbn-es-archiver/src/lib/indices/create_index_stream.test.ts +++ b/packages/kbn-es-archiver/src/lib/indices/create_index_stream.test.ts @@ -17,6 +17,7 @@ import { createCreateIndexStream } from './create_index_stream'; import { createStubStats, createStubIndexRecord, + createStubDataStreamRecord, createStubDocRecord, createStubClient, createStubLogger, @@ -171,6 +172,19 @@ describe('esArchiver: createCreateIndexStream()', () => { expect(output).toEqual(nonRecordValues); }); + + it('creates data streams', async () => { + const client = createStubClient(); + const stats = createStubStats(); + + await createPromiseFromStreams([ + createListStream([createStubDataStreamRecord('foo-datastream', 'foo-template')]), + createCreateIndexStream({ client, stats, log }), + ]); + + sinon.assert.calledOnce(client.indices.putIndexTemplate as sinon.SinonSpy); + sinon.assert.calledOnce(client.indices.createDataStream as sinon.SinonSpy); + }); }); describe('deleteKibanaIndices', () => { diff --git a/packages/kbn-es-archiver/src/lib/indices/create_index_stream.ts b/packages/kbn-es-archiver/src/lib/indices/create_index_stream.ts index 2ab53a2ca012c0..38f4bed755262d 100644 --- a/packages/kbn-es-archiver/src/lib/indices/create_index_stream.ts +++ b/packages/kbn-es-archiver/src/lib/indices/create_index_stream.ts @@ -13,15 +13,18 @@ import * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; import type { Client } from '@elastic/elasticsearch'; import { ToolingLog } from '@kbn/tooling-log'; +import { IndicesPutIndexTemplateRequest } from '@elastic/elasticsearch/lib/api/types'; import { Stats } from '../stats'; import { deleteKibanaIndices } from './kibana_index'; import { deleteIndex } from './delete_index'; +import { deleteDataStream } from './delete_data_stream'; import { ES_CLIENT_HEADERS } from '../../client_headers'; interface DocRecord { value: estypes.IndicesIndexState & { index: string; type: string; + template?: IndicesPutIndexTemplateRequest; }; } @@ -54,6 +57,43 @@ export function createCreateIndexStream({ stream.push(record); } + async function handleDataStream(record: DocRecord, attempts = 1) { + if (docsOnly) return; + + const { data_stream: dataStream, template } = record.value as { + data_stream: string; + template: IndicesPutIndexTemplateRequest; + }; + + try { + await client.indices.putIndexTemplate(template, { + headers: ES_CLIENT_HEADERS, + }); + + await client.indices.createDataStream( + { name: dataStream }, + { + headers: ES_CLIENT_HEADERS, + } + ); + stats.createdDataStream(dataStream, template.name, { template }); + } catch (err) { + if (err?.meta?.body?.error?.type !== 'resource_already_exists_exception' || attempts >= 3) { + throw err; + } + + if (skipExisting) { + skipDocsFromIndices.add(dataStream); + stats.skippedIndex(dataStream); + return; + } + + await deleteDataStream(client, dataStream, template.name); + stats.deletedDataStream(dataStream, template.name); + await handleDataStream(record, attempts + 1); + } + } + async function handleIndex(record: DocRecord) { const { index, settings, mappings, aliases } = record.value; const isKibanaTaskManager = index.startsWith('.kibana_task_manager'); @@ -134,6 +174,10 @@ export function createCreateIndexStream({ await handleIndex(record); break; + case 'data_stream': + await handleDataStream(record); + break; + case 'doc': await handleDoc(this, record); break; diff --git a/packages/kbn-es-archiver/src/lib/indices/delete_data_stream.ts b/packages/kbn-es-archiver/src/lib/indices/delete_data_stream.ts new file mode 100644 index 00000000000000..6aa68db4216f49 --- /dev/null +++ b/packages/kbn-es-archiver/src/lib/indices/delete_data_stream.ts @@ -0,0 +1,14 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import type { Client } from '@elastic/elasticsearch'; + +export async function deleteDataStream(client: Client, datastream: string, template: string) { + await client.indices.deleteDataStream({ name: datastream }); + await client.indices.deleteIndexTemplate({ name: template }); +} diff --git a/packages/kbn-es-archiver/src/lib/indices/delete_index_stream.test.ts b/packages/kbn-es-archiver/src/lib/indices/delete_index_stream.test.ts index 241d4a89445460..4917deab542d4a 100644 --- a/packages/kbn-es-archiver/src/lib/indices/delete_index_stream.test.ts +++ b/packages/kbn-es-archiver/src/lib/indices/delete_index_stream.test.ts @@ -16,6 +16,7 @@ import { createStubStats, createStubClient, createStubIndexRecord, + createStubDataStreamRecord, createStubLogger, } from './__mocks__/stubs'; @@ -51,4 +52,25 @@ describe('esArchiver: createDeleteIndexStream()', () => { sinon.assert.calledOnce(client.indices.delete as sinon.SinonSpy); sinon.assert.notCalled(client.indices.exists as sinon.SinonSpy); }); + + it('deletes data streams', async () => { + const stats = createStubStats(); + const client = createStubClient([]); + + await createPromiseFromStreams([ + createListStream([createStubDataStreamRecord('foo-datastream', 'foo-template')]), + createDeleteIndexStream(client, stats, log), + ]); + + sinon.assert.calledOnce(stats.deletedDataStream as sinon.SinonSpy); + sinon.assert.notCalled(client.indices.create as sinon.SinonSpy); + sinon.assert.calledOnce(client.indices.deleteDataStream as sinon.SinonSpy); + sinon.assert.calledWith(client.indices.deleteDataStream as sinon.SinonSpy, { + name: 'foo-datastream', + }); + sinon.assert.calledOnce(client.indices.deleteIndexTemplate as sinon.SinonSpy); + sinon.assert.calledWith(client.indices.deleteIndexTemplate as sinon.SinonSpy, { + name: 'foo-template', + }); + }); }); diff --git a/packages/kbn-es-archiver/src/lib/indices/delete_index_stream.ts b/packages/kbn-es-archiver/src/lib/indices/delete_index_stream.ts index 450d575181529b..c7633465ccc4cb 100644 --- a/packages/kbn-es-archiver/src/lib/indices/delete_index_stream.ts +++ b/packages/kbn-es-archiver/src/lib/indices/delete_index_stream.ts @@ -13,6 +13,7 @@ import { ToolingLog } from '@kbn/tooling-log'; import { Stats } from '../stats'; import { deleteIndex } from './delete_index'; import { cleanKibanaIndices } from './kibana_index'; +import { deleteDataStream } from './delete_data_stream'; export function createDeleteIndexStream(client: Client, stats: Stats, log: ToolingLog) { return new Transform({ @@ -20,7 +21,11 @@ export function createDeleteIndexStream(client: Client, stats: Stats, log: Tooli writableObjectMode: true, async transform(record, enc, callback) { try { - if (!record || record.type === 'index') { + if (!record) { + log.warning(`deleteIndexStream: empty index provided`); + return callback(); + } + if (record.type === 'index') { const { index } = record.value; if (index.startsWith('.kibana')) { @@ -28,6 +33,14 @@ export function createDeleteIndexStream(client: Client, stats: Stats, log: Tooli } else { await deleteIndex({ client, stats, log, index }); } + } else if (record.type === 'data_stream') { + const { + data_stream: dataStream, + template: { name }, + } = record.value; + + await deleteDataStream(client, dataStream, name); + stats.deletedDataStream(dataStream, name); } else { this.push(record); } diff --git a/packages/kbn-es-archiver/src/lib/indices/generate_index_records_stream.test.ts b/packages/kbn-es-archiver/src/lib/indices/generate_index_records_stream.test.ts index fbd351cea63a98..566760b0ddf883 100644 --- a/packages/kbn-es-archiver/src/lib/indices/generate_index_records_stream.test.ts +++ b/packages/kbn-es-archiver/src/lib/indices/generate_index_records_stream.test.ts @@ -9,10 +9,12 @@ import sinon from 'sinon'; import { createListStream, createPromiseFromStreams, createConcatStream } from '@kbn/utils'; -import { createStubClient, createStubStats } from './__mocks__/stubs'; +import { createStubClient, createStubLogger, createStubStats } from './__mocks__/stubs'; import { createGenerateIndexRecordsStream } from './generate_index_records_stream'; +const log = createStubLogger(); + describe('esArchiver: createGenerateIndexRecordsStream()', () => { it('consumes index names and queries for the mapping of each', async () => { const indices = ['index1', 'index2', 'index3', 'index4']; @@ -21,7 +23,7 @@ describe('esArchiver: createGenerateIndexRecordsStream()', () => { await createPromiseFromStreams([ createListStream(indices), - createGenerateIndexRecordsStream({ client, stats }), + createGenerateIndexRecordsStream({ client, stats, log }), ]); expect(stats.getTestSummary()).toEqual({ @@ -40,7 +42,7 @@ describe('esArchiver: createGenerateIndexRecordsStream()', () => { await createPromiseFromStreams([ createListStream(['index1']), - createGenerateIndexRecordsStream({ client, stats }), + createGenerateIndexRecordsStream({ client, stats, log }), ]); const params = (client.indices.get as sinon.SinonSpy).args[0][0]; @@ -58,7 +60,7 @@ describe('esArchiver: createGenerateIndexRecordsStream()', () => { const indexRecords = await createPromiseFromStreams([ createListStream(['index1', 'index2', 'index3']), - createGenerateIndexRecordsStream({ client, stats }), + createGenerateIndexRecordsStream({ client, stats, log }), createConcatStream([]), ]); @@ -83,7 +85,7 @@ describe('esArchiver: createGenerateIndexRecordsStream()', () => { const indexRecords = await createPromiseFromStreams([ createListStream(['index1']), - createGenerateIndexRecordsStream({ client, stats }), + createGenerateIndexRecordsStream({ client, stats, log }), createConcatStream([]), ]); @@ -107,7 +109,7 @@ describe('esArchiver: createGenerateIndexRecordsStream()', () => { const indexRecords = await createPromiseFromStreams([ createListStream(['.kibana_7.16.0_001']), - createGenerateIndexRecordsStream({ client, stats }), + createGenerateIndexRecordsStream({ client, stats, log }), createConcatStream([]), ]); @@ -122,7 +124,7 @@ describe('esArchiver: createGenerateIndexRecordsStream()', () => { const indexRecords = await createPromiseFromStreams([ createListStream(['.foo']), - createGenerateIndexRecordsStream({ client, stats }), + createGenerateIndexRecordsStream({ client, stats, log }), createConcatStream([]), ]); @@ -137,7 +139,7 @@ describe('esArchiver: createGenerateIndexRecordsStream()', () => { const indexRecords = await createPromiseFromStreams([ createListStream(['.kibana_7.16.0_001']), - createGenerateIndexRecordsStream({ client, stats, keepIndexNames: true }), + createGenerateIndexRecordsStream({ client, stats, log, keepIndexNames: true }), createConcatStream([]), ]); diff --git a/packages/kbn-es-archiver/src/lib/indices/generate_index_records_stream.ts b/packages/kbn-es-archiver/src/lib/indices/generate_index_records_stream.ts index e3efaa28516093..de32e93e273989 100644 --- a/packages/kbn-es-archiver/src/lib/indices/generate_index_records_stream.ts +++ b/packages/kbn-es-archiver/src/lib/indices/generate_index_records_stream.ts @@ -8,18 +8,28 @@ import type { Client } from '@elastic/elasticsearch'; import { Transform } from 'stream'; +import { ToolingLog } from '@kbn/tooling-log'; import { Stats } from '../stats'; import { ES_CLIENT_HEADERS } from '../../client_headers'; +import { getIndexTemplate } from '..'; + +const headers = { + headers: ES_CLIENT_HEADERS, +}; export function createGenerateIndexRecordsStream({ client, stats, keepIndexNames, + log, }: { client: Client; stats: Stats; keepIndexNames?: boolean; + log: ToolingLog; }) { + const seenDatastreams = new Set(); + return new Transform({ writableObjectMode: true, readableObjectMode: true, @@ -32,6 +42,7 @@ export function createGenerateIndexRecordsStream({ filter_path: [ '*.settings', '*.mappings', + '*.data_stream', // remove settings that aren't really settings '-*.settings.index.creation_date', '-*.settings.index.uuid', @@ -44,37 +55,58 @@ export function createGenerateIndexRecordsStream({ ], }, { - headers: ES_CLIENT_HEADERS, + ...headers, meta: true, } ) ).body; - for (const [index, { settings, mappings }] of Object.entries(resp)) { - const { - body: { - [index]: { aliases }, - }, - } = await client.indices.getAlias( - { index }, - { - headers: ES_CLIENT_HEADERS, - meta: true, + for (const [index, { data_stream: dataStream, settings, mappings }] of Object.entries( + resp + )) { + if (dataStream) { + log.info(`${index} will be saved as data_stream ${dataStream}`); + + if (seenDatastreams.has(dataStream)) { + log.info(`${dataStream} is already archived`); + continue; } - ); - stats.archivedIndex(index, { settings, mappings }); - this.push({ - type: 'index', - value: { - // if keepIndexNames is false, rewrite the .kibana_* index to .kibana_1 so that - // when it is loaded it can skip migration, if possible - index: index.startsWith('.kibana') && !keepIndexNames ? '.kibana_1' : index, - settings, - mappings, - aliases, - }, - }); + const { data_streams: dataStreams } = await client.indices.getDataStream( + { name: dataStream }, + headers + ); + const template = await getIndexTemplate(client, dataStreams[0].template); + + seenDatastreams.add(dataStream); + stats.archivedIndex(dataStream, { template }); + this.push({ + type: 'data_stream', + value: { + data_stream: dataStream, + template, + }, + }); + } else { + const { + body: { + [index]: { aliases }, + }, + } = await client.indices.getAlias({ index }, { ...headers, meta: true }); + + stats.archivedIndex(index, { settings, mappings }); + this.push({ + type: 'index', + value: { + // if keepIndexNames is false, rewrite the .kibana_* index to .kibana_1 so that + // when it is loaded it can skip migration, if possible + index: index.startsWith('.kibana') && !keepIndexNames ? '.kibana_1' : index, + settings, + mappings, + aliases, + }, + }); + } } callback(); diff --git a/packages/kbn-es-archiver/src/lib/records/filter_records_stream.test.ts b/packages/kbn-es-archiver/src/lib/records/filter_records_stream.test.ts index 506507ba0b9e6d..901664988d165d 100644 --- a/packages/kbn-es-archiver/src/lib/records/filter_records_stream.test.ts +++ b/packages/kbn-es-archiver/src/lib/records/filter_records_stream.test.ts @@ -26,7 +26,7 @@ describe('esArchiver: createFilterRecordsStream()', () => { }, chance.bool(), ]), - createFilterRecordsStream('type'), + createFilterRecordsStream((record) => record.type === 'type'), createConcatStream([]), ]); @@ -45,7 +45,7 @@ describe('esArchiver: createFilterRecordsStream()', () => { { type: chance.word({ length: 10 }), value: {} }, { type: chance.word({ length: 10 }), value: {} }, ]), - createFilterRecordsStream(type1), + createFilterRecordsStream((record) => record.type === type1), createConcatStream([]), ]); diff --git a/packages/kbn-es-archiver/src/lib/records/filter_records_stream.ts b/packages/kbn-es-archiver/src/lib/records/filter_records_stream.ts index 69ab06454f93b4..9ded38a6f2b58b 100644 --- a/packages/kbn-es-archiver/src/lib/records/filter_records_stream.ts +++ b/packages/kbn-es-archiver/src/lib/records/filter_records_stream.ts @@ -8,13 +8,13 @@ import { Transform } from 'stream'; -export function createFilterRecordsStream(type: string) { +export function createFilterRecordsStream(fn: (record: any) => boolean) { return new Transform({ writableObjectMode: true, readableObjectMode: true, transform(record, enc, callback) { - if (record && record.type === type) { + if (record && fn(record)) { callback(undefined, record); } else { callback(); diff --git a/packages/kbn-es-archiver/src/lib/stats.ts b/packages/kbn-es-archiver/src/lib/stats.ts index 9ff16d57b8661c..1b533a18acadef 100644 --- a/packages/kbn-es-archiver/src/lib/stats.ts +++ b/packages/kbn-es-archiver/src/lib/stats.ts @@ -83,6 +83,15 @@ export function createStats(name: string, log: ToolingLog) { info('Deleted existing index %j', index); } + /** + * Record that a data stream was deleted + * @param index + */ + public deletedDataStream(stream: string, template: string) { + getOrCreate(stream).deleted = true; + info('Deleted existing data stream %j with index template %j', stream, template); + } + /** * Record that an index was created * @param index @@ -95,6 +104,18 @@ export function createStats(name: string, log: ToolingLog) { }); } + /** + * Record that a data stream was created + * @param index + */ + public createdDataStream(stream: string, template: string, metadata: Record = {}) { + getOrCreate(stream).created = true; + info('Created data stream %j with index template %j', stream, template); + Object.keys(metadata).forEach((key) => { + debug('%j %s %j', stream, key, metadata[key]); + }); + } + /** * Record that an index was written to the archives * @param index