Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SO Migration] fix reindex race on multi-instance mode #104516

Merged
merged 8 commits into from
Jul 7, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions rfcs/text/0013_saved_object_migrations.md
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,11 @@ Note:
6. Set a write block on the source index. This prevents any further writes from outdated nodes.
7. Create a new temporary index `.kibana_7.10.0_reindex_temp` with `dynamic: false` on the top-level mappings so that any kind of document can be written to the index. This allows us to write untransformed documents to the index which might have fields which have been removed from the latest mappings defined by the plugin. Define minimal mappings for the `migrationVersion` and `type` fields so that we're still able to search for outdated documents that need to be transformed.
1. Ignore errors if the target index already exists.
8. Reindex the source index into the new temporary index.
1. Use `op_type=create` `conflicts=proceed` and `wait_for_completion=false` so that multiple instances can perform the reindex in parallel but only one write per document will succeed.
2. Wait for the reindex task to complete. If reindexing doesn’t complete within the 60s timeout, log a warning for visibility and poll again.
8. Reindex the source index into the new temporary index using a 'client-side' reindex, by reading batches of documents from the source, migrating them, and indexing them into the temp index.
1. Use `op_type=index` so that multiple instances can perform the reindex in parallel (last node running will override the documents, with no effect as the input data is the same)
2. Ignore `version_conflict_engine_exception` exceptions as they just mean that another node was indexing the same documents
3. If a `target_index_had_write_block` exception is encountered for all document of a batch, assume that another node already completed the temporary index reindex, and jump to the next step
4. If a document transform throws an exception, add the document to a failure list and continue trying to transform all other documents (without writing them to the temp index). If any failures occured, log the complete list of documents that failed to transform, then fail the migration.
Comment on lines +260 to +264
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We forgot to update the RFC for the client-side reindex. Fixed it, and added the new target_index_had_write_block special case,

9. Clone the temporary index into the target index `.kibana_7.10.0_001`. Since any further writes will only happen against the cloned target index this prevents a lost delete from occuring where one instance finishes the migration and deletes a document and another instance's reindex operation re-creates the deleted document.
1. Set a write block on the temporary index
2. Clone the temporary index into the target index while specifying that the target index should have writes enabled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,96 @@
* Side Public License, v 1.
*/

import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors';
import * as Either from 'fp-ts/Either';
import { errors as EsErrors } from '@elastic/elasticsearch';
jest.mock('./catch_retryable_es_client_errors');
import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks';
import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors';
import { bulkOverwriteTransformedDocuments } from './bulk_overwrite_transformed_documents';

jest.mock('./catch_retryable_es_client_errors');

describe('bulkOverwriteTransformedDocuments', () => {
beforeEach(() => {
jest.clearAllMocks();
});

// Create a mock client that rejects all methods with a 503 status code
// response.
const retryableError = new EsErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
statusCode: 503,
body: { error: { type: 'es_type', reason: 'es_reason' } },
})
);
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
);
it('resolves with `right:bulk_index_succeeded` if no error is encountered', async () => {
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createSuccessTransportRequestPromise({
items: [
{
index: {
_index: '.dolly',
},
},
{
index: {
_index: '.dolly',
},
},
],
})
);

const task = bulkOverwriteTransformedDocuments({
client,
index: 'new_index',
transformedDocs: [],
refresh: 'wait_for',
});

const result = await task();

expect(Either.isRight(result)).toBe(true);
expect((result as Either.Right<any>).right).toEqual('bulk_index_succeeded');
});

it('resolves with `right:bulk_index_succeeded` if version conflict errors are encountered', async () => {
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createSuccessTransportRequestPromise({
items: [
{
index: {
_index: '.dolly',
},
},
{
index: {
error: {
type: 'version_conflict_engine_exception',
reason: 'reason',
},
},
},
],
})
);

const task = bulkOverwriteTransformedDocuments({
client,
index: 'new_index',
transformedDocs: [],
refresh: 'wait_for',
});

const result = await task();

expect(Either.isRight(result)).toBe(true);
expect((result as Either.Right<any>).right).toEqual('bulk_index_succeeded');
});

it('calls catchRetryableEsClientErrors when the promise rejects', async () => {
// Create a mock client that rejects all methods with a 503 status code response.
const retryableError = new EsErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
statusCode: 503,
body: { error: { type: 'es_type', reason: 'es_reason' } },
})
);
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createErrorTransportRequestPromise(retryableError)
);

const task = bulkOverwriteTransformedDocuments({
client,
index: 'new_index',
Expand All @@ -43,4 +110,93 @@ describe('bulkOverwriteTransformedDocuments', () => {

expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError);
});

it('resolves with `left:target_index_had_write_block` if all errors are write block exceptions', async () => {
const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createSuccessTransportRequestPromise({
items: [
{
index: {
error: {
type: 'cluster_block_exception',
reason:
'index [.kibana_9000] blocked by: [FORBIDDEN/8/moving to block index write (api)]',
},
},
},
{
index: {
error: {
type: 'cluster_block_exception',
reason:
'index [.kibana_9000] blocked by: [FORBIDDEN/8/moving to block index write (api)]',
},
},
},
],
})
);

const task = bulkOverwriteTransformedDocuments({
client,
index: 'new_index',
transformedDocs: [],
refresh: 'wait_for',
});

const result = await task();

expect(Either.isLeft(result)).toBe(true);
expect((result as Either.Left<any>).left).toEqual({
type: 'target_index_had_write_block',
});
});

it('throws an error if any error is not a write block exceptions', async () => {
(catchRetryableEsClientErrors as jest.Mock).mockImplementation((e) => {
throw e;
});

const client = elasticsearchClientMock.createInternalClient(
elasticsearchClientMock.createSuccessTransportRequestPromise({
items: [
{
index: {
error: {
type: 'cluster_block_exception',
reason:
'index [.kibana_9000] blocked by: [FORBIDDEN/8/moving to block index write (api)]',
},
},
},
{
index: {
error: {
type: 'dolly_exception',
reason: 'because',
},
},
},
{
index: {
error: {
type: 'cluster_block_exception',
reason:
'index [.kibana_9000] blocked by: [FORBIDDEN/8/moving to block index write (api)]',
},
},
},
],
})
);

const task = bulkOverwriteTransformedDocuments({
client,
index: 'new_index',
transformedDocs: [],
refresh: 'wait_for',
});

await expect(task()).rejects.toThrow();
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import {
catchRetryableEsClientErrors,
RetryableEsClientError,
} from './catch_retryable_es_client_errors';
import { isWriteBlockException } from './es_errors';
import { WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE } from './constants';
import type { TargetIndexHadWriteBlock } from './index';

/** @internal */
export interface BulkOverwriteTransformedDocumentsParams {
Expand All @@ -24,6 +26,7 @@ export interface BulkOverwriteTransformedDocumentsParams {
transformedDocs: SavedObjectsRawDoc[];
refresh?: estypes.Refresh;
}

/**
* Write the up-to-date transformed documents to the index, overwriting any
* documents that are still on their outdated version.
Expand All @@ -34,7 +37,7 @@ export const bulkOverwriteTransformedDocuments = ({
transformedDocs,
refresh = false,
}: BulkOverwriteTransformedDocumentsParams): TaskEither.TaskEither<
RetryableEsClientError,
RetryableEsClientError | TargetIndexHadWriteBlock,
'bulk_index_succeeded'
> => () => {
return client
Expand Down Expand Up @@ -71,12 +74,19 @@ export const bulkOverwriteTransformedDocuments = ({
.then((res) => {
// Filter out version_conflict_engine_exception since these just mean
// that another instance already updated these documents
const errors = (res.body.items ?? []).filter(
(item) => item.index?.error?.type !== 'version_conflict_engine_exception'
);
const errors = (res.body.items ?? [])
.filter((item) => item.index?.error)
.map((item) => item.index!.error!)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow! so many ?, ! 😅 I'd write it as

.map((item) => item.index?.error)
.filter(Boolean)
.filter(({ type }) => type !== 'version_conflict_engine_exception');

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, the problem is that TS is stupid with map/filter.

.map((item) => item.index?.error)
.filter(Boolean)

Is not sufficient to have the | undefined part removed from error. The third line complains that ErrorContainer | undefined does not have a type property, which is why I kinda was forced to filter first then force-cast using !

.filter(({ type }) => type !== 'version_conflict_engine_exception');
Comment on lines -74 to +80
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not even sure how this was working previously with non-error items, as

>>({}).index?.error?.type !== 'version_conflict_engine_exception'
<< true

but In doubt, I added more steps for the sake of readability.


if (errors.length === 0) {
return Either.right('bulk_index_succeeded' as const);
} else {
if (errors.every(isWriteBlockException)) {
return Either.left({
type: 'target_index_had_write_block' as const,
});
}
Comment on lines +85 to +89
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's very likely that if any write_block exception is encountered, all the objects encountered it, but just in case another error was returned, we check that all errors are effectively write block exceptions.

throw new Error(JSON.stringify(errors));
}
})
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import { isIncompatibleMappingException, isWriteBlockException } from './es_errors';

describe('isWriteBlockError', () => {
it('returns true for a `index write` cluster_block_exception', () => {
expect(
isWriteBlockException({
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add an integration test instead? Since it's easy to reproduce for an ES instance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already an integration test for the action using the function, but it doesn't hurt to do it for the helper itself.

type: 'cluster_block_exception',
reason: `index [.kibana_dolly] blocked by: [FORBIDDEN/8/index write (api)]`,
})
).toEqual(true);
});
it('returns true for a `moving to block index write` cluster_block_exception', () => {
expect(
isWriteBlockException({
type: 'cluster_block_exception',
reason: `index [.kibana_dolly] blocked by: [FORBIDDEN/8/moving to block index write (api)]`,
})
).toEqual(true);
});
it('returns false for incorrect type', () => {
expect(
isWriteBlockException({
type: 'not_a_cluster_block_exception_at_all',
reason: `index [.kibana_dolly] blocked by: [FORBIDDEN/8/index write (api)]`,
})
).toEqual(false);
});
});

describe('isIncompatibleMappingExceptionError', () => {
it('returns true for `strict_dynamic_mapping_exception` errors', () => {
expect(
isIncompatibleMappingException({
type: 'strict_dynamic_mapping_exception',
reason: 'idk',
})
).toEqual(true);
});

it('returns true for `mapper_parsing_exception` errors', () => {
expect(
isIncompatibleMappingException({
type: 'mapper_parsing_exception',
reason: 'idk',
})
).toEqual(true);
});
});
23 changes: 23 additions & 0 deletions src/core/server/saved_objects/migrationsv2/actions/es_errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

export interface EsErrorCause {
type: string;
reason: string;
}

export const isWriteBlockException = ({ type, reason }: EsErrorCause): boolean => {
return (
type === 'cluster_block_exception' &&
reason.match(/index \[.+] blocked by: \[FORBIDDEN\/8\/.+ \(api\)\]/) !== null
);
};
Comment on lines +14 to +19
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depending on the type of operation, the reason identifying a write block can vary

e.g

index [.kibana_dolly] blocked by: [FORBIDDEN/8/index write (api)]
index [.kibana_dolly] blocked by: [FORBIDDEN/8/moving to block index write (api)]

I extracted the function previously present in wait_for_reindex_task.ts and made it more generic to match any FORBIDDEN/8/*** (api) text.


export const isIncompatibleMappingException = ({ type }: EsErrorCause): boolean => {
return type === 'strict_dynamic_mapping_exception' || type === 'mapper_parsing_exception';
};
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,14 @@ describe('migration actions', () => {
transformedDocs: sourceDocs,
refresh: 'wait_for',
})()
).rejects.toMatchObject(expect.anything());
).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Left",
"left": Object {
"type": "target_index_had_write_block",
},
}
`);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not fan of snapshots to test resolved results (especially as the resolved object is small), but this is what is done in the other tests of the file, and I didn't want to fix them all in this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not for all the tests

expect(res.right).toEqual(
expect.objectContaining({
existing_index_with_docs: {
aliases: {},
mappings: expect.anything(),
settings: expect.anything(),
},
})
);
});
});
so I wouldn't use snapshots in this case.

});
it('resolves left index_not_found_exception when the index does not exist', async () => {
expect.assertions(1);
Expand Down Expand Up @@ -1496,7 +1503,7 @@ describe('migration actions', () => {
}
`);
});
it('rejects if there are errors', async () => {
it('resolves left if there are write_block errors', async () => {
Comment on lines -1499 to +1503
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to also add a IT test to assert that it still rejects for other errors, but it seems that using an non-existing index surprisingly leads to a timeout (which is handled by catchRetryableEsClientErrors) instead of a more final error, and I couldn't find a way to trigger another kind of error from ES.

If anyone has an idea, I'll take it. Else it's probably fine as it's covered in unit tests anyway.

const newDocs = ([
{ _source: { title: 'doc 5' } },
{ _source: { title: 'doc 6' } },
Expand All @@ -1509,7 +1516,14 @@ describe('migration actions', () => {
transformedDocs: newDocs,
refresh: 'wait_for',
})()
).rejects.toMatchObject(expect.anything());
).resolves.toMatchInlineSnapshot(`
Object {
"_tag": "Left",
"left": Object {
"type": "target_index_had_write_block",
},
}
`);
});
});
});
Loading