Skip to content

Commit

Permalink
esArchiver datastream support (#132853)
Browse files Browse the repository at this point in the history
* aliases fallback

* nasty datastream support implementation

* datastreams stats method

* update filter stream

* datastream support for unload action

* create-index datastream support

* index records data stream support

* doc records data streams support

* [CI] Auto-commit changed files from 'node scripts/eslint --no-cache --fix'

* lint

* pull composable templates

* set data_stream as a separate property on documents

* force create bulk operation when datastream record

* [CI] Auto-commit changed files from 'node scripts/eslint --no-cache --fix'

* lint

* getIndexTemplate tests

* [CI] Auto-commit changed files from 'node scripts/precommit_hook.js --ref HEAD~1..HEAD --fix'

* share cache across transform executions

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
  • Loading branch information
klacabane and kibanamachine authored Jun 2, 2022
1 parent e921693 commit 4c4f0f5
Show file tree
Hide file tree
Showing 20 changed files with 476 additions and 45 deletions.
2 changes: 1 addition & 1 deletion packages/kbn-es-archiver/src/actions/save.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ export async function saveAction({
// export and save the matching indices to mappings.json
createPromiseFromStreams([
createListStream(indices),
createGenerateIndexRecordsStream({ client, stats, keepIndexNames }),
createGenerateIndexRecordsStream({ client, stats, keepIndexNames, log }),
...createFormatArchiveStreams(),
createWriteStream(resolve(outputDir, 'mappings.json')),
] as [Readable, ...Writable[]]),
Expand Down
2 changes: 1 addition & 1 deletion packages/kbn-es-archiver/src/actions/unload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ export async function unloadAction({
await createPromiseFromStreams([
createReadStream(resolve(inputDir, filename)) as Readable,
...createParseArchiveStreams({ gzip: isGzip(filename) }),
createFilterRecordsStream('index'),
createFilterRecordsStream((record) => ['index', 'data_stream'].includes(record.type)),
createDeleteIndexStream(client, stats, log),
] as [Readable, ...Writable[]]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,29 @@ interface SearchResponses {
}>;
}

function createMockClient(responses: SearchResponses) {
function createMockClient(responses: SearchResponses, hasDataStreams = false) {
// TODO: replace with proper mocked client
const client: any = {
helpers: {
scrollSearch: jest.fn(function* ({ index }) {
if (hasDataStreams) {
index = `.ds-${index}`;
}

while (responses[index] && responses[index].length) {
yield responses[index].shift()!;
}
}),
},
indices: {
get: jest.fn(async ({ index }) => {
return { [index]: { data_stream: hasDataStreams && index.substring(4) } };
}),
getDataStream: jest.fn(async ({ name }) => {
if (!hasDataStreams) return { data_streams: [] };
return { data_streams: [{ name }] };
}),
},
};
return client;
}
Expand Down Expand Up @@ -217,6 +230,35 @@ describe('esArchiver: createGenerateDocRecordsStream()', () => {
`);
});

it('supports data streams', async () => {
const hits = [
{ _index: '.ds-foo-datastream', _id: '0', _source: {} },
{ _index: '.ds-foo-datastream', _id: '1', _source: {} },
];
const responses = {
'.ds-foo-datastream': [{ body: { hits: { hits, total: hits.length } } }],
};
const client = createMockClient(responses, true);

const stats = createStats('test', log);
const progress = new Progress();

const results = await createPromiseFromStreams([
createListStream(['foo-datastream']),
createGenerateDocRecordsStream({
client,
stats,
progress,
}),
createMapStream((record: any) => {
return `${record.value.data_stream}:${record.value.id}`;
}),
createConcatStream([]),
]);

expect(results).toEqual(['foo-datastream:0', 'foo-datastream:1']);
});

describe('keepIndexNames', () => {
it('changes .kibana* index names if keepIndexNames is not enabled', async () => {
const hits = [{ _index: '.kibana_7.16.0_001', _id: '0', _source: {} }];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ export function createGenerateDocRecordsStream({
}
);

const hasDatastreams =
(await client.indices.getDataStream({ name: index })).data_streams.length > 0;
const indexToDatastream = new Map();

let remainingHits: number | null = null;

for await (const resp of interator) {
Expand All @@ -57,14 +61,25 @@ export function createGenerateDocRecordsStream({

for (const hit of resp.body.hits.hits) {
remainingHits -= 1;
stats.archivedDoc(hit._index);

if (hasDatastreams && !indexToDatastream.has(hit._index)) {
const {
[hit._index]: { data_stream: dataStream },
} = await client.indices.get({ index: hit._index, filter_path: ['*.data_stream'] });
indexToDatastream.set(hit._index, dataStream);
}

const dataStream = indexToDatastream.get(hit._index);
stats.archivedDoc(dataStream || hit._index);

this.push({
type: 'doc',
value: {
// if keepIndexNames is false, rewrite the .kibana_* index to .kibana_1 so that
// when it is loaded it can skip migration, if possible
index:
hit._index.startsWith('.kibana') && !keepIndexNames ? '.kibana_1' : hit._index,
data_stream: dataStream,
id: hit._id,
source: hit._source,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,55 @@ describe('bulk helper onDocument param', () => {
createIndexDocRecordsStream(client as any, stats, progress, true),
]);
});

it('returns create ops for data stream documents', async () => {
const records = [
{
type: 'doc',
value: {
index: '.ds-foo-ds',
data_stream: 'foo-ds',
id: '0',
source: {
hello: 'world',
},
},
},
{
type: 'doc',
value: {
index: '.ds-foo-ds',
data_stream: 'foo-ds',
id: '1',
source: {
hello: 'world',
},
},
},
];
expect.assertions(records.length);

const client = new MockClient();
client.helpers.bulk.mockImplementation(async ({ datasource, onDocument }) => {
for (const d of datasource) {
const op = onDocument(d);
expect(op).toEqual({
create: {
_index: 'foo-ds',
_id: expect.stringMatching(/^\d$/),
},
});
}
});

const stats = createStats('test', log);
const progress = new Progress();

await createPromiseFromStreams([
createListStream(records),
createIndexDocRecordsStream(client as any, stats, progress),
]);
});
});

describe('bulk helper onDrop param', () => {
Expand Down
15 changes: 11 additions & 4 deletions packages/kbn-es-archiver/src/lib/docs/index_doc_records_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,19 @@ import { Stats } from '../stats';
import { Progress } from '../progress';
import { ES_CLIENT_HEADERS } from '../../client_headers';

enum BulkOperation {
Create = 'create',
Index = 'index',
}

export function createIndexDocRecordsStream(
client: Client,
stats: Stats,
progress: Progress,
useCreate: boolean = false
) {
async function indexDocs(docs: any[]) {
const operation = useCreate === true ? 'create' : 'index';
const operation = useCreate === true ? BulkOperation.Create : BulkOperation.Index;
const ops = new WeakMap<any, any>();
const errors: string[] = [];

Expand All @@ -29,9 +34,11 @@ export function createIndexDocRecordsStream(
retries: 5,
datasource: docs.map((doc) => {
const body = doc.source;
const op = doc.data_stream ? BulkOperation.Create : operation;
const index = doc.data_stream || doc.index;
ops.set(body, {
[operation]: {
_index: doc.index,
[op]: {
_index: index,
_id: doc.id,
},
});
Expand All @@ -56,7 +63,7 @@ export function createIndexDocRecordsStream(
}

for (const doc of docs) {
stats.indexedDoc(doc.index);
stats.indexedDoc(doc.data_stream || doc.index);
}
}

Expand Down
2 changes: 2 additions & 0 deletions packages/kbn-es-archiver/src/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,5 @@ export {
export { readDirectory } from './directory';

export { Progress } from './progress';

export { getIndexTemplate } from './index_template';
105 changes: 105 additions & 0 deletions packages/kbn-es-archiver/src/lib/index_template.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import type { Client } from '@elastic/elasticsearch';

import sinon from 'sinon';
import { getIndexTemplate } from './index_template';

describe('esArchiver: index template', () => {
describe('getIndexTemplate', () => {
it('returns the index template', async () => {
const client = {
indices: {
getIndexTemplate: sinon.stub().resolves({
index_templates: [
{
index_template: {
index_patterns: ['pattern-*'],
template: {
mappings: { properties: { foo: { type: 'keyword' } } },
},
priority: 500,
composed_of: [],
data_stream: { hidden: false },
},
},
],
}),
},
} as unknown as Client;

const template = await getIndexTemplate(client, 'template-foo');

expect(template).toEqual({
name: 'template-foo',
index_patterns: ['pattern-*'],
template: {
mappings: { properties: { foo: { type: 'keyword' } } },
},
priority: 500,
data_stream: { hidden: false },
});
});

it('resolves component templates', async () => {
const client = {
indices: {
getIndexTemplate: sinon.stub().resolves({
index_templates: [
{
index_template: {
index_patterns: ['pattern-*'],
composed_of: ['the-settings', 'the-mappings'],
},
},
],
}),
},
cluster: {
getComponentTemplate: sinon
.stub()
.onFirstCall()
.resolves({
component_templates: [
{
component_template: {
template: {
aliases: { 'foo-alias': {} },
},
},
},
],
})
.onSecondCall()
.resolves({
component_templates: [
{
component_template: {
template: {
mappings: { properties: { foo: { type: 'keyword' } } },
},
},
},
],
}),
},
} as unknown as Client;

const template = await getIndexTemplate(client, 'template-foo');

expect(template).toEqual({
name: 'template-foo',
index_patterns: ['pattern-*'],
template: {
aliases: { 'foo-alias': {} },
mappings: { properties: { foo: { type: 'keyword' } } },
},
});
});
});
});
37 changes: 37 additions & 0 deletions packages/kbn-es-archiver/src/lib/index_template.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import { merge } from 'lodash';
import type { Client } from '@elastic/elasticsearch';

import { ES_CLIENT_HEADERS } from '../client_headers';

export const getIndexTemplate = async (client: Client, templateName: string) => {
const { index_templates: indexTemplates } = await client.indices.getIndexTemplate(
{ name: templateName },
{ headers: ES_CLIENT_HEADERS }
);
const {
index_template: { template, composed_of: composedOf = [], ...indexTemplate },
} = indexTemplates[0];

const components = await Promise.all(
composedOf.map(async (component) => {
const { component_templates: componentTemplates } = await client.cluster.getComponentTemplate(
{ name: component }
);
return componentTemplates[0].component_template.template;
})
);

return {
...indexTemplate,
name: templateName,
template: merge(template, ...components),
};
};
Loading

0 comments on commit 4c4f0f5

Please sign in to comment.