From 7d7db7c346c6a245253b09918093ca35e4537149 Mon Sep 17 00:00:00 2001 From: Rudolf Meijering Date: Tue, 1 Dec 2020 14:39:23 +0100 Subject: [PATCH] model: test control state progressions --- .../saved_objects/migrationsv2/actions.ts | 207 +++++++---- .../saved_objects/migrationsv2/index.test.ts | 345 +++++++++++++++++- .../saved_objects/migrationsv2/index.ts | 240 ++++++------ 3 files changed, 591 insertions(+), 201 deletions(-) diff --git a/src/core/server/saved_objects/migrationsv2/actions.ts b/src/core/server/saved_objects/migrationsv2/actions.ts index 7c5d9028e4685bf..c3183dbde4ecbe1 100644 --- a/src/core/server/saved_objects/migrationsv2/actions.ts +++ b/src/core/server/saved_objects/migrationsv2/actions.ts @@ -17,12 +17,15 @@ * under the License. */ +/* eslint-disable no-console */ + import * as Either from 'fp-ts/lib/Either'; import * as TaskEither from 'fp-ts/lib/TaskEither'; import * as Option from 'fp-ts/lib/Option'; import { ElasticsearchClientError } from '@elastic/elasticsearch/lib/errors'; import { pipe } from 'fp-ts/lib/pipeable'; import { errors as EsErrors } from '@elastic/elasticsearch'; +import { flow } from 'fp-ts/lib/function'; import { ElasticsearchClient } from '../../elasticsearch'; import { IndexMapping } from '../mappings'; import { SavedObjectsRawDoc } from '../serialization'; @@ -37,12 +40,6 @@ export type AllResponses = | UpdateByQueryResponse | UpdateAndPickupMappingsResponse; -export type ExpectedErrors = - | EsErrors.NoLivingConnectionsError - | EsErrors.ConnectionError - | EsErrors.TimeoutError - | EsErrors.ResponseError; - const retryResponseStatuses = [ 503, // ServiceUnavailable 401, // AuthorizationException @@ -51,7 +48,14 @@ const retryResponseStatuses = [ 410, // Gone ]; -const catchEsClientErrors = (e: ElasticsearchClientError) => { +export interface RetryableEsClientError { + type: 'retryable_es_client_error'; + error: EsErrors.ElasticsearchClientError; +} + +const catchRetryableEsClientErrors = ( + e: EsErrors.ElasticsearchClientError +): Either.Either => { if ( e instanceof EsErrors.NoLivingConnectionsError || e instanceof EsErrors.ConnectionError || @@ -60,7 +64,7 @@ const catchEsClientErrors = (e: ElasticsearchClientError) => { (retryResponseStatuses.includes(e.statusCode) || e.body?.error?.type === 'snapshot_in_progress_exception')) ) { - return Either.left(e); + return Either.left({ type: 'retryable_es_client_error' as const, error: e }); } else { throw e; } @@ -81,7 +85,7 @@ export type FetchIndexResponse = Record< export const fetchIndices = ( client: ElasticsearchClient, indicesToFetch: string[] -): TaskEither.TaskEither => () => { +): TaskEither.TaskEither => () => { return client.indices .get( { @@ -93,7 +97,7 @@ export const fetchIndices = ( .then(({ body }) => { return Either.right(body); }) - .catch(catchEsClientErrors); + .catch(catchRetryableEsClientErrors); }; export interface SetIndexWriteBlockResponse { @@ -116,29 +120,10 @@ export interface SetIndexWriteBlockResponse { export const setWriteBlock = ( client: ElasticsearchClient, index: string -): TaskEither.TaskEither => () => { - return client.indices - .addBlock<{ - acknowledged: boolean; - shards_acknowledged: boolean; - }>( - { - index, - block: 'write', - }, - { maxRetries: 0 /** handle retry ourselves for now */ } - ) - .then((res) => { - return Either.right(res.body); - }) - .catch(catchEsClientErrors); -}; - -export const setWriteBlock2 = ( - client: ElasticsearchClient, - index: string ): TaskEither.TaskEither< - 'set_write_block_failed' | 'index_not_found_exception', + | { type: 'set_write_block_failed' } + | { type: 'index_not_found_exception' } + | RetryableEsClientError, 'set_write_block_succeeded' > => () => { return client.indices @@ -155,29 +140,30 @@ export const setWriteBlock2 = ( .then((res) => { return res.body.acknowledged === true ? Either.right('set_write_block_succeeded' as const) - : Either.left('set_write_block_failed' as const); + : Either.left({ type: 'set_write_block_failed' as const }); }) .catch((e: ElasticsearchClientError) => { if (e instanceof EsErrors.ResponseError) { if (e.message === 'index_not_found_exception') { - return Either.left(e.message); + return Either.left({ type: 'index_not_found_exception' as const }); } } throw e; - }); + }) + .catch(catchRetryableEsClientErrors); }; const waitForStatus = ( client: ElasticsearchClient, index: string, status: 'green' | 'yellow' | 'red' -): TaskEither.TaskEither => () => { +): TaskEither.TaskEither => () => { return client.cluster .health({ index, wait_for_status: status, timeout: '30s' }) .then(() => { return Either.right({}); }) - .catch(catchEsClientErrors); + .catch(catchRetryableEsClientErrors); }; export type CloneIndexResponse = AcknowledgeResponse; @@ -200,8 +186,8 @@ export const cloneIndex = ( client: ElasticsearchClient, source: string, target: string -): TaskEither.TaskEither => { - const cloneTask: TaskEither.TaskEither = () => { +): TaskEither.TaskEither => { + const cloneTask: TaskEither.TaskEither = () => { return client.indices .clone( { @@ -244,9 +230,10 @@ export const cloneIndex = ( shardsAcknowledged: false, }); } else { - return catchEsClientErrors(error); + throw error; } - }); + }) + .catch(catchRetryableEsClientErrors); }; return pipe( @@ -270,6 +257,7 @@ export const cloneIndex = ( }; export interface WaitForTaskResponse { + error: Option.Option<{ type: string; reason: string; index: string }>; completed: boolean; failures: Option.Option; description: string; @@ -288,13 +276,13 @@ export const waitForTask = ( client: ElasticsearchClient, taskId: string, timeout: string -): TaskEither.TaskEither => () => { +): TaskEither.TaskEither => () => { return client.tasks .get<{ completed: boolean; response: { failures: any[] }; task: { description: string }; - error: { type: string; reason: string }; + error: { type: string; reason: string; index: string }; }>({ task_id: taskId, wait_for_completion: true, @@ -302,17 +290,17 @@ export const waitForTask = ( }) .then((res) => { const body = res.body; - if (res.body.error ?? false) { - return Either.left(new EsErrors.ResponseError(res)); - } + console.log('\n' + JSON.stringify(res) + '\n'); return Either.right({ completed: body.completed, + error: Option.fromNullable(body.error), failures: - body.response.failures.length > 0 ? Option.some(body.response.failures) : Option.none, + (body.response.failures ?? []).length > 0 + ? Option.some(body.response.failures) + : Option.none, description: body.task.description, }); - }) - .catch(catchEsClientErrors); + }); }; // eslint-disable-next-line @typescript-eslint/no-empty-interface @@ -321,13 +309,13 @@ export interface DeleteIndexResponse {} export const deleteIndex = ( client: ElasticsearchClient, index: string -): TaskEither.TaskEither => () => { +): TaskEither.TaskEither => () => { return client.indices .delete({ index, timeout: '60s' }) .then(() => { return Either.right({}); }) - .catch(catchEsClientErrors); + .catch(catchRetryableEsClientErrors); }; export interface UpdateByQueryResponse { @@ -352,7 +340,7 @@ export const updateByQuery = ( client: ElasticsearchClient, index: string, script?: string -): TaskEither.TaskEither => () => { +): TaskEither.TaskEither => () => { return client .updateByQuery({ // Ignore version conflicts that can occur from parralel update by query operations @@ -381,7 +369,7 @@ export const updateByQuery = ( .then(({ body: { task: taskId } }) => { return Either.right({ taskId }); }) - .catch(catchEsClientErrors); + .catch(catchRetryableEsClientErrors); }; export interface ReindexResponse { @@ -407,16 +395,17 @@ export const reindex = ( sourceIndex: string, targetIndex: string, reindexScript: Option.Option -): TaskEither.TaskEither => () => { +): TaskEither.TaskEither => () => { return client .reindex({ + // Require targetIndex to be an alias. Prevents a new index from being + // created if targetIndex doesn't exist. body: { // Ignore version conflicts from existing documents conflicts: 'proceed', source: { index: sourceIndex, - // Set batch size to 100, not sure if it's necessary to make this - // smaller than the default of 1000? + // Set batch size to 100 size: 100, }, dest: { @@ -439,12 +428,47 @@ export const reindex = ( }) .then(({ body: { task: taskId } }) => { return Either.right({ taskId }); - }) - .catch(catchEsClientErrors); + }); }; -// eslint-disable-next-line @typescript-eslint/no-empty-interface -export interface UpdateAliasesResponse {} +export const waitForReindexTask = flow( + waitForTask, + TaskEither.chain( + ( + res + ): TaskEither.TaskEither< + | { type: 'index_not_found_exception'; index: string } + | { type: 'target_index_had_write_block' }, + 'reindex_succeded' + > => { + const failureIsAWriteBlock = ({ type, reason }: { type: string; reason: string }) => + type === 'cluster_block_exception' && + reason.match(/index \[.+] blocked by: \[FORBIDDEN\/8\/index write \(api\)\]/); + + if (Option.isSome(res.error) && res.error.value.type === 'index_not_found_exception') { + return TaskEither.left({ + type: 'index_not_found_exception' as const, + index: res.error.value.index, + }); + } else if (Option.isSome(res.failures)) { + if (res.failures.value.every(failureIsAWriteBlock)) { + return TaskEither.left({ type: 'target_index_had_write_block' as const }); + } else { + // TODO: Reindex will automatically retry read and write failures. + // Is it still necessary to handle retryable failures like + // snapshot_in_progress_exception ourselves? If this is rare enough + // it's sufficient to let the whole migration crash and retry the + // migration from scratch instead of retrying the reindex step only. + throw new Error( + 'Reindex failed with the following failures:\n' + JSON.stringify(res.failures) + ); + } + } else { + return TaskEither.right('reindex_succeded' as const); + } + } + ) +); export type AliasAction = | { remove_index: { index: string } } @@ -459,7 +483,11 @@ export type AliasAction = export const updateAliases = ( client: ElasticsearchClient, aliasActions: AliasAction[] -): TaskEither.TaskEither => () => { +): TaskEither.TaskEither< + | { type: 'index_not_found_exception'; index: string } + | { type: 'remove_index_not_a_concrete_index' }, + 'succeded' +> => () => { return client.indices .updateAliases({ body: { @@ -467,13 +495,24 @@ export const updateAliases = ( }, }) .then((res) => { - return Either.right({}); + return Either.right('succeded' as const); }) - .catch((err) => { - console.log(err.meta.body); + .catch((err: EsErrors.ElasticsearchClientError) => { + console.log(JSON.stringify(err)); + if (err instanceof EsErrors.ResponseError) { + if (err.message === 'index_not_found_exception') { + return Either.left({ type: 'index_not_found_exception' as const, index: err.body.index }); + } else if ( + err.message === 'illegal_argument_exception' && + err.body.reason.match( + /The provided expression \[.+\] matches an alias, specify the corresponding concrete indices instead./ + ) + ) { + return Either.left({ type: 'remove_index_not_a_concrete_index' }); + } + } throw err; - }) - .catch(catchEsClientErrors); + }); }; export interface AcknowledgeResponse { @@ -481,8 +520,6 @@ export interface AcknowledgeResponse { shardsAcknowledged: boolean; } -export type CreateIndexResponse = AcknowledgeResponse; - /** * Creates an index with the given mappings * @@ -500,8 +537,11 @@ export const createIndex = ( client: ElasticsearchClient, indexName: string, mappings: IndexMapping -): TaskEither.TaskEither => { - const createIndexTask: TaskEither.TaskEither = () => { +): TaskEither.TaskEither => { + const createIndexTask: TaskEither.TaskEither< + RetryableEsClientError, + AcknowledgeResponse + > = () => { return client.indices .create( { @@ -515,6 +555,7 @@ export const createIndex = ( body: { mappings, settings: { + 'index.blocks.write': true, // ES rule of thumb: shards should be several GB to 10's of GB, so // Kibana is unlikely to cross that limit. number_of_shards: 1, @@ -555,9 +596,10 @@ export const createIndex = ( shardsAcknowledged: false, }); } else { - return catchEsClientErrors(error); + throw error; } - }); + }) + .catch(catchRetryableEsClientErrors); }; return pipe( @@ -565,14 +607,14 @@ export const createIndex = ( TaskEither.chain((res) => { if (res.acknowledged && res.shardsAcknowledged) { // If the cluster state was updated and all shards ackd we're done - return TaskEither.right(res); + return TaskEither.right('create_index_succeeded'); } else { // Otherwise, wait until the target index has a 'green' status. return pipe( waitForStatus(client, indexName, 'green'), TaskEither.map(() => { /** When the index status is 'green' we know that all shards were started */ - return { acknowledged: true, shardsAcknowledged: true }; + return 'create_index_succeeded'; }) ); } @@ -596,8 +638,11 @@ export const updateAndPickupMappings = ( client: ElasticsearchClient, index: string, mappings: IndexMapping -): TaskEither.TaskEither => { - const putMappingTask: TaskEither.TaskEither = () => { +): TaskEither.TaskEither => { + const putMappingTask: TaskEither.TaskEither< + RetryableEsClientError, + { acknowledged: boolean } + > = () => { return client.indices .putMapping, IndexMapping>({ index, @@ -608,7 +653,7 @@ export const updateAndPickupMappings = ( // TODO do we need to check res.body.acknowledged? return Either.right({ acknowledged: res.body.acknowledged }); }) - .catch(catchEsClientErrors); + .catch(catchRetryableEsClientErrors); }; return pipe( @@ -626,7 +671,7 @@ export const search = ( client: ElasticsearchClient, index: string, query: Record -): TaskEither.TaskEither => () => { +): TaskEither.TaskEither => () => { return client .search<{ // when `filter_path` is specified, ES doesn't return empty arrays, so if @@ -666,14 +711,14 @@ export const search = ( ], }) .then((res) => Either.right({ hits: res.body.hits?.hits ?? [] })) - .catch(catchEsClientErrors); + .catch(catchRetryableEsClientErrors); }; export const bulkIndex = ( client: ElasticsearchClient, index: string, docs: SavedObjectsRawDoc[] -): TaskEither.TaskEither => () => { +): TaskEither.TaskEither => () => { return client .bulk<{ took: number; @@ -723,5 +768,5 @@ export const bulkIndex = ( return Either.left(new Error(JSON.stringify(errors))); } }) - .catch(catchEsClientErrors); + .catch(catchRetryableEsClientErrors); }; diff --git a/src/core/server/saved_objects/migrationsv2/index.test.ts b/src/core/server/saved_objects/migrationsv2/index.test.ts index 180ddc45d97c37a..c62d93eaa6bede4 100644 --- a/src/core/server/saved_objects/migrationsv2/index.test.ts +++ b/src/core/server/saved_objects/migrationsv2/index.test.ts @@ -20,7 +20,22 @@ import { ResponseError } from '@elastic/elasticsearch/lib/errors'; import * as Either from 'fp-ts/lib/Either'; import * as Option from 'fp-ts/lib/Option'; -import { FatalState, model, next, State, ResponseType, LegacySetWriteBlockState } from '.'; +import { + FatalState, + model, + next, + State, + ResponseType, + LegacySetWriteBlockState, + SetSourceWriteBlockState, + LegacyCreateReindexTargetState, + LegacyReindexState, + LegacyReindexWaitForTaskState, + LegacyDeleteState, + CloneSourceToTargetState, + UpdateTargetMappingsState, + UpdateTargetMappingsWaitForTaskState, +} from '.'; import { ElasticsearchClient } from '../../elasticsearch'; describe('migrations v2', () => { @@ -31,7 +46,20 @@ describe('migrations v2', () => { retryDelay: 0, indexPrefix: '.kibana', outdatedDocumentsQuery: {}, - targetMappings: { properties: {}, _meta: { migrationMappingPropertyHashes: {} } }, + targetMappings: { + properties: { + new_saved_object_type: { + properties: { + value: { type: 'text' }, + }, + }, + }, + _meta: { + migrationMappingPropertyHashes: { + new_saved_object_type: '4a11183eee21e6fbad864f7a30b39ad0', + }, + }, + }, preMigrationScript: Option.none, }; @@ -113,13 +141,47 @@ describe('migrations v2', () => { '.kibana': {}, '.kibana_7.11.0': {}, }, - mappings: { properties: {}, _meta: { migrationMappingPropertyHashes: {} } }, + mappings: { + properties: { + disabled_saved_object_type: { + properties: { + value: { type: 'keyword' }, + }, + }, + }, + _meta: { + migrationMappingPropertyHashes: { + disabled_saved_object_type: '7997cf5a56cc02bdc9c93361bde732b0', + }, + }, + }, settings: {}, }, }); const newState = model(initState, res); expect(newState.controlState).toEqual('UPDATE_TARGET_MAPPINGS'); + expect(newState.targetMappings).toMatchInlineSnapshot(` + Object { + "_meta": Object { + "migrationMappingPropertyHashes": Object { + "disabled_saved_object_type": "7997cf5a56cc02bdc9c93361bde732b0", + "new_saved_object_type": "4a11183eee21e6fbad864f7a30b39ad0", + }, + }, + "properties": Object { + "new_saved_object_type": Object { + "properties": Object { + "value": Object { + "type": "text", + }, + }, + }, + }, + } + `); + expect(newState.retryCount).toEqual(0); + expect(newState.retryDelay).toEqual(0); }); test("INIT -> FATAL when .kibana points to newer version's index", () => { const res: ResponseType<'INIT'> = Either.right({ @@ -164,6 +226,8 @@ describe('migrations v2', () => { source: Option.some('.kibana_7.11.0_001'), target: '.kibana_7.12.0_001', }); + expect(newState.retryCount).toEqual(0); + expect(newState.retryDelay).toEqual(0); }); test('INIT -> SET_SOURCE_WRITE_BLOCK when migrating from a v1 migrations index (>= 6.5 < 7.11.0)', () => { const res: ResponseType<'INIT'> = Either.right({ @@ -182,6 +246,8 @@ describe('migrations v2', () => { source: Option.some('.kibana_3'), target: '.kibana_7.11.0_001', }); + expect(newState.retryCount).toEqual(0); + expect(newState.retryDelay).toEqual(0); }); test('INIT -> LEGACY_SET_WRITE_BLOCK when migrating from a legacy index (>= 6.0.0 < 6.5)', () => { const res: ResponseType<'INIT'> = Either.right({ @@ -198,6 +264,8 @@ describe('migrations v2', () => { source: Option.some('.kibana_pre6.5.0_001'), target: '.kibana_7.11.0_001', }); + expect(newState.retryCount).toEqual(0); + expect(newState.retryDelay).toEqual(0); }); test('INIT -> CREATE_NEW_TARGET when no indices/aliases exist', () => { const res: ResponseType<'INIT'> = Either.right({}); @@ -208,6 +276,8 @@ describe('migrations v2', () => { source: Option.none, target: '.kibana_7.11.0_001', }); + expect(newState.retryCount).toEqual(0); + expect(newState.retryDelay).toEqual(0); }); }); describe('LEGACY_SET_WRITE_BLOCK', () => { @@ -221,39 +291,280 @@ describe('migrations v2', () => { legacyPreMigrationDoneActions: [], legacy: '', }; - test('LEGACY_SET_WRITE_BLOCK -> LEGACY_CREATE_REINDEX_TARGET if action fails with index_not_found_exception', () => { - const res: ResponseType<'LEGACY_SET_WRITE_BLOCK'> = Either.left( - 'index_not_found_exception' - ); - const newState = model(legacySetWriteBlockState, res); - expect(newState.controlState).toEqual('LEGACY_CREATE_REINDEX_TARGET'); - }); test('LEGACY_SET_WRITE_BLOCK -> LEGACY_SET_WRITE_BLOCK if action fails with set_write_block_failed', () => { - const res: ResponseType<'LEGACY_SET_WRITE_BLOCK'> = Either.left('set_write_block_failed'); + const res: ResponseType<'LEGACY_SET_WRITE_BLOCK'> = Either.left({ + type: 'set_write_block_failed', + }); const newState = model(legacySetWriteBlockState, res); expect(newState.controlState).toEqual('LEGACY_SET_WRITE_BLOCK'); expect(newState.retryCount).toEqual(1); expect(newState.retryDelay).toEqual(2000); }); - test('LEGACY_SET_WRITE_BLOCK -> LEGACY_CREATE_REINDEX_TARGET if action res has set_write_block_succeeded', () => { + test('LEGACY_SET_WRITE_BLOCK -> LEGACY_CREATE_REINDEX_TARGET if action fails with index_not_found_exception', () => { + const res: ResponseType<'LEGACY_SET_WRITE_BLOCK'> = Either.left({ + type: 'index_not_found_exception', + }); + const newState = model(legacySetWriteBlockState, res); + expect(newState.controlState).toEqual('LEGACY_CREATE_REINDEX_TARGET'); + expect(newState.retryCount).toEqual(0); + expect(newState.retryDelay).toEqual(0); + }); + test('LEGACY_SET_WRITE_BLOCK -> LEGACY_CREATE_REINDEX_TARGET if action succeeds with set_write_block_succeeded', () => { const res: ResponseType<'LEGACY_SET_WRITE_BLOCK'> = Either.right( 'set_write_block_succeeded' ); const newState = model(legacySetWriteBlockState, res); expect(newState.controlState).toEqual('LEGACY_CREATE_REINDEX_TARGET'); + expect(newState.retryCount).toEqual(0); + expect(newState.retryDelay).toEqual(0); + }); + }); + describe('LEGACY_CREATE_REINDEX_TARGET', () => { + const legacyCreateReindexTargetState: LegacyCreateReindexTargetState = { + ...baseState, + controlState: 'LEGACY_CREATE_REINDEX_TARGET', + versionIndexReadyActions: Option.none, + source: Option.some('.kibana') as Option.Some, + target: '.kibana_7.11.0_001', + legacyReindexTargetMappings: { properties: {} }, + legacyPreMigrationDoneActions: [], + legacy: '', + }; + test('LEGACY_CREATE_REINDEX_TARGET -> LEGACY_REINDEX', () => { + const res: ResponseType<'LEGACY_CREATE_REINDEX_TARGET'> = Either.right( + 'create_index_succeeded' + ); + const newState = model(legacyCreateReindexTargetState, res); + expect(newState.controlState).toEqual('LEGACY_REINDEX'); + expect(newState.retryCount).toEqual(0); + expect(newState.retryDelay).toEqual(0); + }); + // The createIndex action called by LEGACY_CREATE_REINDEX_TARGET never + // returns a left, it will always succeed or timeout. Since timeout + // failures are always retried we don't explicity test this logic + }); + describe('LEGACY_REINDEX', () => { + const legacyReindexState: LegacyReindexState = { + ...baseState, + controlState: 'LEGACY_REINDEX', + versionIndexReadyActions: Option.none, + source: Option.some('.kibana') as Option.Some, + target: '.kibana_7.11.0_001', + legacyReindexTargetMappings: { properties: {} }, + legacyPreMigrationDoneActions: [], + legacy: '', + }; + test('LEGACY_REINDEX -> LEGACY_REINDEX_WAIT_FOR_TASK', () => { + const res: ResponseType<'LEGACY_REINDEX'> = Either.right({ taskId: 'task id' }); + const newState = model(legacyReindexState, res); + expect(newState.controlState).toEqual('LEGACY_REINDEX_WAIT_FOR_TASK'); + expect(newState.retryCount).toEqual(0); + expect(newState.retryDelay).toEqual(0); + }); + }); + describe('LEGACY_REINDEX_WAIT_FOR_TASK', () => { + const legacyReindexWaitForTaskState: LegacyReindexWaitForTaskState = { + ...baseState, + controlState: 'LEGACY_REINDEX_WAIT_FOR_TASK', + versionIndexReadyActions: Option.none, + source: Option.some('source_index_name') as Option.Some, + target: '.kibana_7.11.0_001', + legacyReindexTargetMappings: { properties: {} }, + legacyPreMigrationDoneActions: [], + legacy: 'legacy_index_name', + legacyReindexTaskId: 'test_task_id', + }; + test('LEGACY_REINDEX_WAIT_FOR_TASK -> LEGACY_DELETE if action succeeds', () => { + const res: ResponseType<'LEGACY_REINDEX_WAIT_FOR_TASK'> = Either.right( + 'reindex_succeded' + ); + const newState = model(legacyReindexWaitForTaskState, res); + expect(newState.controlState).toEqual('LEGACY_DELETE'); + expect(newState.retryCount).toEqual(0); + expect(newState.retryDelay).toEqual(0); }); + test('LEGACY_REINDEX_WAIT_FOR_TASK -> LEGACY_DELETE if action fails with index_not_found_exception for reindex source', () => { + const res: ResponseType<'LEGACY_REINDEX_WAIT_FOR_TASK'> = Either.left({ + type: 'index_not_found_exception', + index: 'legacy_index_name', + }); + const newState = model(legacyReindexWaitForTaskState, res); + expect(newState.controlState).toEqual('LEGACY_DELETE'); + expect(newState.retryCount).toEqual(0); + expect(newState.retryDelay).toEqual(0); + }); + test('LEGACY_REINDEX_WAIT_FOR_TASK -> LEGACY_DELETE if action fails with target_index_had_write_block', () => { + const res: ResponseType<'LEGACY_REINDEX_WAIT_FOR_TASK'> = Either.left({ + type: 'target_index_had_write_block', + }); + const newState = model(legacyReindexWaitForTaskState, res); + expect(newState.controlState).toEqual('LEGACY_DELETE'); + expect(newState.retryCount).toEqual(0); + expect(newState.retryDelay).toEqual(0); + }); + test('LEGACY_REINDEX_WAIT_FOR_TASK -> FATAL if action fails with index_not_found_exception for reindex target', () => { + const res: ResponseType<'LEGACY_REINDEX_WAIT_FOR_TASK'> = Either.left({ + type: 'index_not_found_exception', + index: 'source_index_name', + }); + const newState = model(legacyReindexWaitForTaskState, res); + expect(newState.controlState).toEqual('FATAL'); + expect(newState.logs[0]).toMatchInlineSnapshot(` + Object { + "level": "error", + "message": "LEGACY_REINDEX failed because the reindex destination index [source_index_name] does not exist.", + } + `); + }); + }); + describe('LEGACY_DELETE', () => { + const legacyDeleteState: LegacyDeleteState = { + ...baseState, + controlState: 'LEGACY_DELETE', + versionIndexReadyActions: Option.none, + source: Option.some('source_index_name') as Option.Some, + target: '.kibana_7.11.0_001', + legacyReindexTargetMappings: { properties: {} }, + legacyPreMigrationDoneActions: [], + legacy: 'legacy_index_name', + }; + test('LEGACY_DELETE -> SET_SOURCE_WRITE_BLOCK if action succeeds', () => { + const res: ResponseType<'LEGACY_DELETE'> = Either.right('succeded'); + const newState = model(legacyDeleteState, res); + expect(newState.controlState).toEqual('SET_SOURCE_WRITE_BLOCK'); + expect(newState.retryCount).toEqual(0); + expect(newState.retryDelay).toEqual(0); + }); + test('LEGACY_DELETE -> SET_SOURCE_WRITE_BLOCK if action fails with index_not_found_exception for legacy index', () => { + const res: ResponseType<'LEGACY_REINDEX_WAIT_FOR_TASK'> = Either.left({ + type: 'index_not_found_exception', + index: 'legacy_index_name', + }); + const newState = model(legacyDeleteState, res); + expect(newState.controlState).toEqual('SET_SOURCE_WRITE_BLOCK'); + expect(newState.retryCount).toEqual(0); + expect(newState.retryDelay).toEqual(0); + }); + test('LEGACY_DELETE -> SET_SOURCE_WRITE_BLOCK if action fails with remove_index_not_a_concrete_index', () => { + const res: ResponseType<'LEGACY_DELETE'> = Either.left({ + type: 'remove_index_not_a_concrete_index', + }); + const newState = model(legacyDeleteState, res); + expect(newState.controlState).toEqual('SET_SOURCE_WRITE_BLOCK'); + expect(newState.retryCount).toEqual(0); + expect(newState.retryDelay).toEqual(0); + }); + test('LEGACY_DELETE -> FATAL if action fails with index_not_found_exception for source index', () => { + const res: ResponseType<'LEGACY_DELETE'> = Either.left({ + type: 'index_not_found_exception', + index: 'source_index_name', + }); + const newState = model(legacyDeleteState, res); + expect(newState.controlState).toEqual('FATAL'); + expect(newState.logs[0]).toMatchInlineSnapshot(` + Object { + "level": "error", + "message": "LEGACY_DELETE failed because the source index [source_index_name] does not exist.", + } + `); + }); + }); + describe('SET_SOURCE_WRITE_BLOCK', () => { + const setWriteBlockState: SetSourceWriteBlockState = { + ...baseState, + controlState: 'SET_SOURCE_WRITE_BLOCK', + versionIndexReadyActions: Option.none, + source: Option.some('.kibana') as Option.Some, + target: '.kibana_7.11.0_001', + }; + test('SET_SOURCE_WRITE_BLOCK -> SET_SOURCE_WRITE_BLOCK if action fails with set_write_block_failed', () => { + const res: ResponseType<'SET_SOURCE_WRITE_BLOCK'> = Either.left({ + type: 'set_write_block_failed', + }); + const newState = model(setWriteBlockState, res); + expect(newState.controlState).toEqual('SET_SOURCE_WRITE_BLOCK'); + expect(newState.retryCount).toEqual(1); + expect(newState.retryDelay).toEqual(2000); + }); + test('SET_SOURCE_WRITE_BLOCK -> CLONE_SOURCE_TO_TARGET if action fails with index_not_found_exception', () => { + const res: ResponseType<'SET_SOURCE_WRITE_BLOCK'> = Either.left({ + type: 'index_not_found_exception', + }); + const newState = model(setWriteBlockState, res); + expect(newState.controlState).toEqual('CLONE_SOURCE_TO_TARGET'); + expect(newState.retryCount).toEqual(0); + expect(newState.retryDelay).toEqual(0); + }); + test('SET_SOURCE_WRITE_BLOCK -> CLONE_SOURCE_TO_TARGET if action succeeds with set_write_block_succeeded', () => { + const res: ResponseType<'SET_SOURCE_WRITE_BLOCK'> = Either.right( + 'set_write_block_succeeded' + ); + const newState = model(setWriteBlockState, res); + expect(newState.controlState).toEqual('CLONE_SOURCE_TO_TARGET'); + expect(newState.retryCount).toEqual(0); + expect(newState.retryDelay).toEqual(0); + }); + }); + describe('CLONE_SOURCE_TO_TARGET', () => { + const cloneSourceToTargetState: CloneSourceToTargetState = { + ...baseState, + controlState: 'CLONE_SOURCE_TO_TARGET', + versionIndexReadyActions: Option.none, + source: Option.some('.kibana') as Option.Some, + target: '.kibana_7.11.0_001', + }; + test('CLONE_SOURCE_TO_TARGET -> UPDATE_TARGET_MAPPINGS', () => { + const res: ResponseType<'CLONE_SOURCE_TO_TARGET'> = Either.right({ + acknowledged: true, + shardsAcknowledged: true, + }); + const newState = model(cloneSourceToTargetState, res); + expect(newState.controlState).toEqual('UPDATE_TARGET_MAPPINGS'); + expect(newState.retryCount).toEqual(0); + expect(newState.retryDelay).toEqual(0); + }); + }); + describe('UPDATE_TARGET_MAPPINGS', () => { + const updateTargetMappingsState: UpdateTargetMappingsState = { + ...baseState, + controlState: 'UPDATE_TARGET_MAPPINGS', + versionIndexReadyActions: Option.none, + source: Option.some('.kibana') as Option.Some, + target: '.kibana_7.11.0_001', + }; + test('UPDATE_TARGET_MAPPINGS -> UPDATE_TARGET_MAPPINGS_WAIT_FOR_TASK', () => { + const res: ResponseType<'UPDATE_TARGET_MAPPINGS'> = Either.right({ + taskId: 'update target mappings task', + }); + const newState = model( + updateTargetMappingsState, + res + ) as UpdateTargetMappingsWaitForTaskState; + expect(newState.controlState).toEqual('UPDATE_TARGET_MAPPINGS_WAIT_FOR_TASK'); + expect(newState.updateTargetMappingsTaskId).toEqual('update target mappings task'); + expect(newState.retryCount).toEqual(0); + expect(newState.retryDelay).toEqual(0); + }); + }); + describe('UPDATE_TARGET_MAPPINGS_WAIT_FOR_TASK', () => { + test.todo('UPDATE_TARGET_MAPPINGS_WAIT_FOR_TASK -> OUTDATED_DOCUMENTS_SEARCH'); + }); + describe('OUTDATED_DOCUMENTS_SEARCH', () => { + test.todo('OUTDATED_DOCUMENTS_SEARCH -> OUTDATED_DOCUMENTS_TRANSFORM'); + test.todo('OUTDATED_DOCUMENTS_SEARCH -> MARK_VERSION_INDEX_READY'); + test.todo('OUTDATED_DOCUMENTS_SEARCH -> DONE'); + }); + describe('OUTDATED_DOCUMENTS_TRANSFORM', () => { + test.todo('OUTDATED_DOCUMENTS_TRANSFORM -> OUTDATED_DOCUMENTS_SEARCH'); }); - describe('', () => { - test.todo(''); + describe('CREATE_NEW_TARGET', () => { + test.todo('CREATE_NEW_TARGET -> MARK_VERSION_INDEX_READY'); }); }); }); describe('next', () => { it.todo('when state.retryDelay > 0 delays execution of the next action'); - it.todo('INIT returns fetchAliases thunk'); - it.todo('SET_SOURCE_WRITE_BLOCK returns setIndexWriteBlock thunk'); - it.todo('CLONE_SOURCE returns cloneIndex thunk'); it('DONE returns null', () => { const state: State = { ...baseState, ...{ controlState: 'DONE' } }; const action = next({} as ElasticsearchClient, (() => {}) as any, state); diff --git a/src/core/server/saved_objects/migrationsv2/index.ts b/src/core/server/saved_objects/migrationsv2/index.ts index 19db55d45992936..040dcb8c33559ea 100644 --- a/src/core/server/saved_objects/migrationsv2/index.ts +++ b/src/core/server/saved_objects/migrationsv2/index.ts @@ -90,6 +90,7 @@ export type CreateNewTargetState = PostInitState & { /** Blank ES cluster, create a new version-specific target index */ controlState: 'CREATE_NEW_TARGET'; source: Option.None; + versionIndexReadyActions: Option.Some; }; export type CloneSourceToTargetState = PostInitState & { @@ -164,7 +165,7 @@ export type LegacyReindexState = LegacyBaseState & { export type LegacyReindexWaitForTaskState = LegacyBaseState & { /** Apply the pre-migration script to the legacy clone to prepare it for a migration */ controlState: 'LEGACY_REINDEX_WAIT_FOR_TASK'; - preMigrationUpdateTaskId: string; + legacyReindexTaskId: string; }; export type LegacyDeleteState = LegacyBaseState & { @@ -234,14 +235,16 @@ const legacyIndex = (state: InitState) => state.indexPrefix; const versionIndex = (state: InitState) => `${state.indexPrefix}_${state.kibanaVersion}_001`; /** - * Merge the _meta mappings of an index with the given target mappings. + * Merge the _meta.migrationMappingPropertyHashes mappings of an index with + * the given target mappings. * * @remarks Mapping updates are commutative (deeply merged) by Elasticsearch, * except for the _meta key. The source index we're migrating from might * contain documents created by a plugin that is disabled in the Kibana - * instance performing this migration. We merge the _meta mappings from the - * source index into the targetMappings to ensure that any - * `migrationPropertyHashes` for disabled plugins aren't lost. + * instance performing this migration. We merge the + * _meta.migrationMappingPropertyHashes mappings from the source index into + * the targetMappings to ensure that any `migrationPropertyHashes` for + * disabled plugins aren't lost. * * Right now we don't use these `migrationPropertyHashes` but it could be used * in the future to optimize the `UPDATE_TARGET_MAPPINGS` step. @@ -249,7 +252,10 @@ const versionIndex = (state: InitState) => `${state.indexPrefix}_${state.kibanaV * @param targetMappings * @param indexMappings */ -function mergeMappings(targetMappings: IndexMapping, indexMappings: IndexMapping) { +function mergeMigrationMappingPropertyHashes( + targetMappings: IndexMapping, + indexMappings: IndexMapping +) { return { ...targetMappings, _meta: { @@ -282,30 +288,6 @@ export type ResponseType = Await< ReturnType> >; -const isRetryableEsClientError = (e: EsClientErrors.ElasticsearchClientError): boolean => { - const retryResponseStatuses = [ - 503, // ServiceUnavailable - 401, // AuthorizationException - 403, // AuthenticationException - 408, // RequestTimeout - 410, // Gone - 429, // TooManyRequests - ]; - - if ( - e instanceof EsClientErrors.NoLivingConnectionsError || - e instanceof EsClientErrors.ConnectionError || - e instanceof EsClientErrors.TimeoutError || - (e instanceof EsClientErrors.ResponseError && - (retryResponseStatuses.includes(e.statusCode) || - e.body?.error?.type === 'snapshot_in_progress_exception')) - ) { - return true; - } else { - return false; - } -}; - const delayOrResetRetryState = ( state: S, resW: ResponseType @@ -352,9 +334,7 @@ const delayOrResetRetryState = ( export const model = ( currentState: State, - resW: - | ResponseType - | Either.Either + resW: ResponseType | Either.Either ): State => { // The action response `resW` is weakly typed, the type includes all action // responses and ElasticsearchClientErrors. Each control state only triggers @@ -365,13 +345,11 @@ export const model = ( let stateP: State = cloneDeep(currentState); if (Either.isLeft(resW)) { - if (resW.left instanceof Error) { - if (isRetryableEsClientError(resW.left)) { - stateP = delayOrResetRetryState(stateP, resW); - return stateP; - } else { - throw resW.left; - } + if ('type' in resW.left && resW.left.type === 'retryable_es_client_error') { + stateP = delayOrResetRetryState(stateP, resW); + return stateP; + } else { + throw resW.left; } } else { stateP = delayOrResetRetryState(stateP, resW); @@ -408,7 +386,7 @@ export const model = ( // index source: Option.none, target: `${stateP.indexPrefix}_${stateP.kibanaVersion}_001`, - targetMappings: mergeMappings( + targetMappings: mergeMigrationMappingPropertyHashes( stateP.targetMappings, indices[aliases[currentAlias(stateP)]].mappings ), @@ -443,7 +421,10 @@ export const model = ( controlState: 'SET_SOURCE_WRITE_BLOCK', source: Option.some(source) as Option.Some, target, - targetMappings: mergeMappings(stateP.targetMappings, indices[source].mappings), + targetMappings: mergeMigrationMappingPropertyHashes( + stateP.targetMappings, + indices[source].mappings + ), versionIndexReadyActions: Option.some([ { remove: { index: source, alias: currentAlias(stateP) /* must_exist: true*/ } }, // TODO: blocked by https://github.com/elastic/elasticsearch/issues/62642 { add: { index: target, alias: currentAlias(stateP) } }, @@ -476,7 +457,7 @@ export const model = ( controlState: 'LEGACY_SET_WRITE_BLOCK', source: Option.some(legacyReindexTarget) as Option.Some, target, - targetMappings: mergeMappings( + targetMappings: mergeMigrationMappingPropertyHashes( stateP.targetMappings, indices[legacyIndex(stateP)].mappings ), @@ -496,7 +477,7 @@ export const model = ( remove: { index: legacyReindexTarget, alias: currentAlias(stateP), - must_exist: true, // TODO: blocked by https://github.com/elastic/elasticsearch/issues/62642 + must_exist: true, }, }, { add: { index: target, alias: currentAlias(stateP) } }, @@ -515,11 +496,9 @@ export const model = ( versionIndexReadyActions: Option.some([ { add: { index: target, alias: currentAlias(stateP) } }, { add: { index: target, alias: versionAlias(stateP) } }, - ]), + ]) as Option.Some, }; } - } else { - // return throwBadResponse(res); } return stateP; } else if (stateP.controlState === 'LEGACY_SET_WRITE_BLOCK') { @@ -532,70 +511,133 @@ export const model = ( // If the write block failed because the index doesn't exist, it means // another instance already completed the legacy pre-migration. Proceed // to the next step. - if (res.left === 'index_not_found_exception') { + if (res.left.type === 'index_not_found_exception') { stateP = { ...stateP, controlState: 'LEGACY_CREATE_REINDEX_TARGET' }; - } else if (res.left === 'set_write_block_failed') { + } else if (res.left.type === 'set_write_block_failed') { stateP = delayOrResetRetryState(stateP, res); } - } else { - return throwBadResponse(res); } return stateP; } else if (stateP.controlState === 'LEGACY_CREATE_REINDEX_TARGET') { const res = resW as ResponseType; - if (Either.isLeft(res)) { - // Ignore if legacy index doesn't exist, this probably means another - // Kibana instance already completed the legacy pre-migration and - // deleted it - if (res.left.message === 'index_not_found_exception') { - return delayOrResetRetryState(stateP, Either.right({})); - } else { - return stateP; - } - } else { + if (Either.isRight(res)) { return { ...stateP, controlState: 'LEGACY_REINDEX', }; + } else { + return stateP; } } else if (stateP.controlState === 'LEGACY_REINDEX') { const res = resW as ResponseType; if (Either.isRight(res)) { - return { + stateP = { ...stateP, controlState: 'LEGACY_REINDEX_WAIT_FOR_TASK', - preMigrationUpdateTaskId: res.right.taskId, + legacyReindexTaskId: res.right.taskId, }; - } else { - return stateP; } + return stateP; } else if (stateP.controlState === 'LEGACY_REINDEX_WAIT_FOR_TASK') { const res = resW as ResponseType; - if (Either.isRight(res) && Option.isSome(res.right.failures)) { - // TODO: ignore index closed error + if (Either.isRight(res)) { + return { + ...stateP, + controlState: 'LEGACY_DELETE', + }; + } else { + if ( + (res.left.type === 'index_not_found_exception' && res.left.index === stateP.legacy) || + res.left.type === 'target_index_had_write_block' + ) { + // index_not_found_exception for the legacy index, another instance + // already complete the LEGACY_DELETE step. + // + // target_index_had_write_block: another instance already completed the + // SET_SOURCE_WRITE_BLOCK step. + // + // If we detect that another instance has already completed a step, we + // can technically skip ahead in the process until after the completed + // step. However, by not skipping ahead we limit branches in the + // control state progression and simplify the implementation. + return { ...stateP, controlState: 'LEGACY_DELETE' }; + } else if ( + res.left.type === 'index_not_found_exception' && + res.left.index === stateP.source.value + ) { + return { + ...stateP, + controlState: 'FATAL', + logs: [ + ...stateP.logs, + { + level: 'error', + message: `LEGACY_REINDEX failed because the reindex destination index [${stateP.source.value}] does not exist.`, + }, + ], + }; + } return { ...stateP, controlState: 'FATAL' }; } - return { - ...stateP, - controlState: 'LEGACY_DELETE', - }; } else if (stateP.controlState === 'LEGACY_DELETE') { const res = resW as ResponseType; if (Either.isLeft(res)) { - // Ignore if legacy index doesn't exist, this probably means another - // Kibana instance already completed the legacy pre-migration and - // deleted it - if (res.left.message === 'index_not_found_exception') { - return delayOrResetRetryState(stateP, Either.right({})); - } else { - return stateP; + if ( + res.left.type === 'remove_index_not_a_concrete_index' || + (res.left.type === 'index_not_found_exception' && res.left.index === stateP.legacy) + ) { + // index_not_found_exception, another Kibana instance already + // deleted the legacy index + // + // remove_index_not_a_concrete_index, another Kibana instance already + // deleted the legacy index and created a .kibana alias + // + // If we detect that another instance has already completed a step, we + // can technically skip ahead in the process until after the completed + // step. However, by not skipping ahead we limit branches in the + // control state progression and simplify the implementation. + return { ...stateP, controlState: 'SET_SOURCE_WRITE_BLOCK' }; + } else if ( + res.left.type === 'index_not_found_exception' && + res.left.index === stateP.source.value + ) { + return { + ...stateP, + controlState: 'FATAL', + logs: [ + ...stateP.logs, + { + level: 'error', + message: `LEGACY_DELETE failed because the source index [${stateP.source.value}] does not exist.`, + }, + ], + }; } } else { return { ...stateP, controlState: 'SET_SOURCE_WRITE_BLOCK' }; } + return stateP; } else if (stateP.controlState === 'SET_SOURCE_WRITE_BLOCK') { - return { ...stateP, controlState: 'CLONE_SOURCE_TO_TARGET' }; + const res = resW as ResponseType; + // If the write block is sucessfully in place, proceed to the next step. + if (Either.isRight(res)) { + stateP = { ...stateP, controlState: 'CLONE_SOURCE_TO_TARGET' }; + } else if (Either.isLeft(res)) { + if (res.left.type === 'index_not_found_exception') { + // If the write block failed because the index doesn't exist, it means + // another instance already completed the legacy pre-migration. Proceed + // to the next step. + stateP = { ...stateP, controlState: 'CLONE_SOURCE_TO_TARGET' }; + } else if (res.left.type === 'set_write_block_failed') { + // If we couldn't add the write block, retry + stateP = delayOrResetRetryState(stateP, res); + } + } else { + return throwBadResponse(res); + } + + return stateP; } else if (stateP.controlState === 'CLONE_SOURCE_TO_TARGET') { return { ...stateP, controlState: 'UPDATE_TARGET_MAPPINGS' }; } else if (stateP.controlState === 'UPDATE_TARGET_MAPPINGS') { @@ -610,7 +652,14 @@ export const model = ( return stateP; } else if (stateP.controlState === 'UPDATE_TARGET_MAPPINGS_WAIT_FOR_TASK') { const res = resW as ResponseType; - if (Either.isRight(res) && Option.isSome(res.right.failures)) { + if ( + Either.isRight(res) && + (Option.isSome(res.right.failures) || Option.isSome(res.right.error)) + ) { + let message = 'Failed to update target mappings: '; + if (Option.isSome(res.right.failures)) message += JSON.stringify(res.right.failures.value); + if (Option.isSome(res.right.error)) message += JSON.stringify(res.right.error); + return { ...stateP, controlState: 'FATAL', @@ -618,8 +667,7 @@ export const model = ( ...stateP.logs, { level: 'error', - message: - 'Failed to update target mappings: ' + JSON.stringify(res.right.failures.value), + message, }, ], }; @@ -662,18 +710,10 @@ export const model = ( controlState: 'OUTDATED_DOCUMENTS_SEARCH', }; } else if (stateP.controlState === 'CREATE_NEW_TARGET') { - if (Option.isSome(stateP.versionIndexReadyActions)) { - return { - ...stateP, - controlState: 'MARK_VERSION_INDEX_READY', - versionIndexReadyActions: stateP.versionIndexReadyActions, - }; - } else { - return { - ...stateP, - controlState: 'DONE', - }; - } + return { + ...stateP, + controlState: 'MARK_VERSION_INDEX_READY', + }; } else if (stateP.controlState === 'MARK_VERSION_INDEX_READY') { // TODO Handle "required alias [.kibana] does not exist" errors blocked by https://github.com/elastic/elasticsearch/issues/62642 return { ...stateP, controlState: 'DONE' }; @@ -720,9 +760,9 @@ export const nextActionMap = ( LEGACY_REINDEX: (state: LegacyReindexState) => Actions.reindex(client, state.legacy, state.source.value, state.preMigrationScript), LEGACY_SET_WRITE_BLOCK: (state: LegacySetWriteBlockState) => - Actions.setWriteBlock2(client, state.legacy), + Actions.setWriteBlock(client, state.legacy), LEGACY_REINDEX_WAIT_FOR_TASK: (state: LegacyReindexWaitForTaskState) => - Actions.waitForTask(client, state.preMigrationUpdateTaskId, '60s'), + Actions.waitForReindexTask(client, state.legacyReindexTaskId, '60s'), LEGACY_DELETE: (state: LegacyDeleteState) => Actions.updateAliases(client, state.legacyPreMigrationDoneActions), }; @@ -741,12 +781,6 @@ export const next = ( }; }; - const tryCatch = any>( - fn: F - ): (() => ReturnType | Promise) => { - return () => fn().catch((e: EsClientErrors.ElasticsearchClientError) => e); - }; - const map = nextActionMap(client, transformRawDocs); if (state.controlState === 'DONE' || state.controlState === 'FATAL') { @@ -759,7 +793,7 @@ export const next = ( const nextAction = map[state.controlState] as ( state: State ) => ReturnType; - return delay(tryCatch(nextAction(state))); + return delay(nextAction(state)); } };