diff --git a/src/core/server/integration_tests/saved_objects/migrations/actions/actions.test.ts b/src/core/server/integration_tests/saved_objects/migrations/actions/actions.test.ts index 67dd9a54fa26fc..13c3e8c9cff9a3 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/actions/actions.test.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/actions/actions.test.ts @@ -16,7 +16,6 @@ import * as kbnTestServer from '../../../../../test_helpers/kbn_server'; import type { SavedObjectsRawDoc } from '../../../../saved_objects/serialization'; import { bulkOverwriteTransformedDocuments, - cloneIndex, closePit, createIndex, openPit, @@ -25,7 +24,7 @@ import { readWithPit, type ReadWithPit, searchForOutdatedDocuments, - SearchResponse, + type SearchResponse, setWriteBlock, updateAliases, waitForReindexTask, @@ -35,16 +34,17 @@ import { type UpdateByQueryResponse, updateAndPickupMappings, type UpdateAndPickupMappingsResponse, - verifyReindex, removeWriteBlock, transformDocs, - waitForIndexStatusYellow, + waitForIndexStatus, initAction, + cloneIndex, } from '../../../../saved_objects/migrations/actions'; import type { DocumentsTransformFailed, DocumentsTransformSuccess, } from '../../../../saved_objects/migrations/core'; +import { MIGRATION_CLIENT_OPTIONS } from '../../../../saved_objects/migrations/run_resilient_migrator'; const { startES } = kbnTestServer.createTestServers({ adjustTimeout: (t: number) => jest.setTimeout(t), @@ -63,7 +63,7 @@ describe('migration actions', () => { beforeAll(async () => { esServer = await startES(); - client = esServer.es.getClient(); + client = esServer.es.getClient().child(MIGRATION_CLIENT_OPTIONS); // Create test fixture data: await createIndex({ @@ -242,21 +242,21 @@ describe('migration actions', () => { expect.assertions(1); const task = setWriteBlock({ client, index: 'new_index_without_write_block' }); await expect(task()).resolves.toMatchInlineSnapshot(` - Object { - "_tag": "Right", - "right": "set_write_block_succeeded", - } - `); + Object { + "_tag": "Right", + "right": "set_write_block_succeeded", + } + `); }); it('resolves right when setting a write block on an index that already has one', async () => { expect.assertions(1); const task = setWriteBlock({ client, index: 'existing_index_with_write_block' }); await expect(task()).resolves.toMatchInlineSnapshot(` - Object { - "_tag": "Right", - "right": "set_write_block_succeeded", - } - `); + Object { + "_tag": "Right", + "right": "set_write_block_succeeded", + } + `); }); it('once resolved, prevents further writes to the index', async () => { expect.assertions(1); @@ -313,10 +313,10 @@ describe('migration actions', () => { expect.assertions(1); const task = removeWriteBlock({ client, index: 'existing_index_with_write_block_2' }); await expect(task()).resolves.toMatchInlineSnapshot(` - Object { - "_tag": "Right", - "right": "remove_write_block_succeeded", - } + Object { + "_tag": "Right", + "right": "remove_write_block_succeeded", + } `); }); it('resolves right if successful when an index does not have a write block', async () => { @@ -336,7 +336,7 @@ describe('migration actions', () => { }); }); - describe('waitForIndexStatusYellow', () => { + describe('waitForIndexStatus', () => { afterEach(async () => { try { await client.indices.delete({ index: 'red_then_yellow_index' }); @@ -365,9 +365,10 @@ describe('migration actions', () => { ); // Start tracking the index status - const indexStatusPromise = waitForIndexStatusYellow({ + const indexStatusPromise = waitForIndexStatus({ client, index: 'red_then_yellow_index', + status: 'yellow', })(); const redStatusResponse = await client.cluster.health({ index: 'red_then_yellow_index' }); @@ -405,10 +406,11 @@ describe('migration actions', () => { }) .catch((e) => {}); // try to wait for index status yellow: - const task = waitForIndexStatusYellow({ + const task = waitForIndexStatus({ client, index: 'red_index', timeout: '1s', + status: 'yellow', }); await expect(task()).resolves.toMatchInlineSnapshot(` Object { @@ -420,6 +422,39 @@ describe('migration actions', () => { } `); }); + + it('resolves left with "index_not_green_timeout" after waiting for an index status to be green timeout', async () => { + // Create a yellow index + await client.indices + .create({ + index: 'yellow_index', + timeout: '5s', + body: { + mappings: { properties: {} }, + settings: { + // Allocate no replicas so that this index stays yellow + number_of_replicas: '0', + }, + }, + }) + .catch((e) => {}); + // try to wait for index status yellow: + const task = waitForIndexStatus({ + client, + index: 'red_index', + timeout: '1s', + status: 'green', + }); + await expect(task()).resolves.toMatchInlineSnapshot(` + Object { + "_tag": "Left", + "left": Object { + "message": "[index_not_green_timeout] Timeout waiting for the status of the [red_index] index to become 'green'", + "type": "index_not_green_timeout", + }, + } + `); + }); }); describe('cloneIndex', () => { @@ -451,19 +486,19 @@ describe('migration actions', () => { } `); }); - it('resolves right after waiting for index status to be yellow if clone target already existed', async () => { + it('resolves right if clone target already existed after waiting for index status to be green ', async () => { expect.assertions(2); - // Create a yellow index + // Create a red index that we later turn into green await client.indices .create({ - index: 'clone_red_then_yellow_index', + index: 'clone_red_then_green_index', timeout: '5s', body: { mappings: { properties: {} }, settings: { - // Allocate 1 replica so that this index stays yellow - number_of_replicas: '1', + // Allocate 1 replica so that this index can go to green + number_of_replicas: '0', // Disable all shard allocation so that the index status is red index: { routing: { allocation: { enable: 'none' } } }, }, @@ -475,24 +510,24 @@ describe('migration actions', () => { const cloneIndexPromise = cloneIndex({ client, source: 'existing_index_with_write_block', - target: 'clone_red_then_yellow_index', + target: 'clone_red_then_green_index', })(); - let indexYellow = false; + let indexGreen = false; setTimeout(() => { client.indices.putSettings({ - index: 'clone_red_then_yellow_index', + index: 'clone_red_then_green_index', body: { - // Enable all shard allocation so that the index status goes yellow + // Enable all shard allocation so that the index status goes green routing: { allocation: { enable: 'all' } }, }, }); - indexYellow = true; + indexGreen = true; }, 10); await cloneIndexPromise.then((res) => { // Assert that the promise didn't resolve before the index became green - expect(indexYellow).toBe(true); + expect(indexGreen).toBe(true); expect(res).toMatchInlineSnapshot(` Object { "_tag": "Right", @@ -504,20 +539,7 @@ describe('migration actions', () => { `); }); }); - it('resolves left index_not_found_exception if the source index does not exist', async () => { - expect.assertions(1); - const task = cloneIndex({ client, source: 'no_such_index', target: 'clone_target_3' }); - await expect(task()).resolves.toMatchInlineSnapshot(` - Object { - "_tag": "Left", - "left": Object { - "index": "no_such_index", - "type": "index_not_found_exception", - }, - } - `); - }); - it('resolves left with a index_not_yellow_timeout if clone target already exists but takes longer than the specified timeout before turning yellow', async () => { + it('resolves left with a index_not_green_timeout if clone target already exists but takes longer than the specified timeout before turning green', async () => { // Create a red index await client.indices .create({ @@ -536,7 +558,7 @@ describe('migration actions', () => { .catch((e) => {}); // Call clone even though the index already exists - const cloneIndexPromise = cloneIndex({ + let cloneIndexPromise = cloneIndex({ client, source: 'existing_index_with_write_block', target: 'clone_red_index', @@ -544,16 +566,16 @@ describe('migration actions', () => { })(); await expect(cloneIndexPromise).resolves.toMatchInlineSnapshot(` - Object { - "_tag": "Left", - "left": Object { - "message": "[index_not_yellow_timeout] Timeout waiting for the status of the [clone_red_index] index to become 'yellow'", - "type": "index_not_yellow_timeout", - }, - } + Object { + "_tag": "Left", + "left": Object { + "message": "[index_not_green_timeout] Timeout waiting for the status of the [clone_red_index] index to become 'green'", + "type": "index_not_green_timeout", + }, + } `); - // Now that we know timeouts work, make the index yellow again and call cloneIndex a second time to verify that it completes + // Now make the index yellow and repeat await client.indices.putSettings({ index: 'clone_red_index', @@ -563,22 +585,63 @@ describe('migration actions', () => { }, }); - // Call clone even though the index already exists with yellow state - const cloneIndexPromise2 = cloneIndex({ + // Call clone even though the index already exists + cloneIndexPromise = cloneIndex({ + client, + source: 'existing_index_with_write_block', + target: 'clone_red_index', + timeout: '1s', + })(); + + await expect(cloneIndexPromise).resolves.toMatchInlineSnapshot(` + Object { + "_tag": "Left", + "left": Object { + "message": "[index_not_green_timeout] Timeout waiting for the status of the [clone_red_index] index to become 'green'", + "type": "index_not_green_timeout", + }, + } + `); + + // Now make the index green and it should succeed + + await client.indices.putSettings({ + index: 'clone_red_index', + body: { + // Set zero replicas so status goes green + number_of_replicas: 0, + }, + }); + + // Call clone even though the index already exists + cloneIndexPromise = cloneIndex({ client, source: 'existing_index_with_write_block', target: 'clone_red_index', timeout: '30s', })(); - await expect(cloneIndexPromise2).resolves.toMatchInlineSnapshot(` - Object { - "_tag": "Right", - "right": Object { - "acknowledged": true, - "shardsAcknowledged": true, - }, - } + await expect(cloneIndexPromise).resolves.toMatchInlineSnapshot(` + Object { + "_tag": "Right", + "right": Object { + "acknowledged": true, + "shardsAcknowledged": true, + }, + } + `); + }); + it('resolves left index_not_found_exception if the source index does not exist', async () => { + expect.assertions(1); + const task = cloneIndex({ client, source: 'no_such_index', target: 'clone_target_3' }); + await expect(task()).resolves.toMatchInlineSnapshot(` + Object { + "_tag": "Left", + "left": Object { + "index": "no_such_index", + "type": "index_not_found_exception", + }, + } `); }); it('resolves left cluster_shard_limit_exceeded when the action would exceed the maximum normal open shards', async () => { @@ -614,10 +677,10 @@ describe('migration actions', () => { })()) as Either.Right; const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' }); await expect(task()).resolves.toMatchInlineSnapshot(` - Object { - "_tag": "Right", - "right": "reindex_succeeded", - } + Object { + "_tag": "Right", + "right": "reindex_succeeded", + } `); const results = ( @@ -687,10 +750,10 @@ describe('migration actions', () => { })()) as Either.Right; const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' }); await expect(task()).resolves.toMatchInlineSnapshot(` - Object { - "_tag": "Right", - "right": "reindex_succeeded", - } + Object { + "_tag": "Right", + "right": "reindex_succeeded", + } `); const results = ( (await searchForOutdatedDocuments(client, { @@ -722,11 +785,11 @@ describe('migration actions', () => { })()) as Either.Right; let task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' }); await expect(task()).resolves.toMatchInlineSnapshot(` - Object { - "_tag": "Right", - "right": "reindex_succeeded", - } - `); + Object { + "_tag": "Right", + "right": "reindex_succeeded", + } + `); // reindex without a script res = (await reindex({ @@ -739,11 +802,11 @@ describe('migration actions', () => { })()) as Either.Right; task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' }); await expect(task()).resolves.toMatchInlineSnapshot(` - Object { - "_tag": "Right", - "right": "reindex_succeeded", - } - `); + Object { + "_tag": "Right", + "right": "reindex_succeeded", + } + `); // Assert that documents weren't overridden by the second, unscripted reindex const results = ( @@ -798,11 +861,11 @@ describe('migration actions', () => { })()) as Either.Right; const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' }); await expect(task()).resolves.toMatchInlineSnapshot(` - Object { - "_tag": "Right", - "right": "reindex_succeeded", - } - `); + Object { + "_tag": "Right", + "right": "reindex_succeeded", + } + `); // Assert that existing documents weren't overridden, but that missing // documents were added by the reindex const results = ( @@ -855,13 +918,13 @@ describe('migration actions', () => { const task = waitForReindexTask({ client, taskId: reindexTaskId, timeout: '10s' }); await expect(task()).resolves.toMatchInlineSnapshot(` - Object { - "_tag": "Left", - "left": Object { - "type": "incompatible_mapping_exception", - }, - } - `); + Object { + "_tag": "Left", + "left": Object { + "type": "incompatible_mapping_exception", + }, + } + `); }); it('resolves left incompatible_mapping_exception if all reindex failures are due to a mapper_parsing_exception', async () => { expect.assertions(1); @@ -894,13 +957,13 @@ describe('migration actions', () => { const task = waitForReindexTask({ client, taskId: reindexTaskId, timeout: '10s' }); await expect(task()).resolves.toMatchInlineSnapshot(` - Object { - "_tag": "Left", - "left": Object { - "type": "incompatible_mapping_exception", - }, - } - `); + Object { + "_tag": "Left", + "left": Object { + "type": "incompatible_mapping_exception", + }, + } + `); }); it('resolves left index_not_found_exception if source index does not exist', async () => { expect.assertions(1); @@ -916,14 +979,14 @@ describe('migration actions', () => { })()) as Either.Right; const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' }); await expect(task()).resolves.toMatchInlineSnapshot(` - Object { - "_tag": "Left", - "left": Object { - "index": "no_such_index", - "type": "index_not_found_exception", - }, - } - `); + Object { + "_tag": "Left", + "left": Object { + "index": "no_such_index", + "type": "index_not_found_exception", + }, + } + `); }); it('resolves left target_index_had_write_block if all failures are due to a write block', async () => { expect.assertions(1); @@ -939,13 +1002,13 @@ describe('migration actions', () => { const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' }); await expect(task()).resolves.toMatchInlineSnapshot(` - Object { - "_tag": "Left", - "left": Object { - "type": "target_index_had_write_block", - }, - } - `); + Object { + "_tag": "Left", + "left": Object { + "type": "target_index_had_write_block", + }, + } + `); }); it('resolves left if requireAlias=true and the target is not an alias', async () => { expect.assertions(1); @@ -961,20 +1024,21 @@ describe('migration actions', () => { const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' }); await expect(task()).resolves.toMatchInlineSnapshot(` - Object { - "_tag": "Left", - "left": Object { - "index": "existing_index_with_write_block", - "type": "index_not_found_exception", - }, - } - `); + Object { + "_tag": "Left", + "left": Object { + "index": "existing_index_with_write_block", + "type": "index_not_found_exception", + }, + } + `); }); it('resolves left wait_for_task_completion_timeout when the task does not finish within the timeout', async () => { - await waitForIndexStatusYellow({ + await waitForIndexStatus({ client, index: '.kibana_1', + status: 'yellow', })(); const res = (await reindex({ @@ -1001,65 +1065,6 @@ describe('migration actions', () => { }); }); - describe('verifyReindex', () => { - it('resolves right if source and target indices have the same amount of documents', async () => { - expect.assertions(1); - const res = (await reindex({ - client, - sourceIndex: 'existing_index_with_docs', - targetIndex: 'reindex_target_7', - reindexScript: Option.none, - requireAlias: false, - excludeOnUpgradeQuery: { match_all: {} }, - })()) as Either.Right; - await waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' })(); - - const task = verifyReindex({ - client, - sourceIndex: 'existing_index_with_docs', - targetIndex: 'reindex_target_7', - }); - await expect(task()).resolves.toMatchInlineSnapshot(` - Object { - "_tag": "Right", - "right": "verify_reindex_succeeded", - } - `); - }); - it('resolves left if source and target indices have different amount of documents', async () => { - expect.assertions(1); - const task = verifyReindex({ - client, - sourceIndex: 'existing_index_with_docs', - targetIndex: 'existing_index_2', - }); - await expect(task()).resolves.toMatchInlineSnapshot(` - Object { - "_tag": "Left", - "left": Object { - "type": "verify_reindex_failed", - }, - } - `); - }); - it('rejects if source or target index does not exist', async () => { - expect.assertions(2); - let task = verifyReindex({ - client, - sourceIndex: 'no_such_index', - targetIndex: 'existing_index_2', - }); - await expect(task()).rejects.toThrow('index_not_found_exception'); - - task = verifyReindex({ - client, - sourceIndex: 'existing_index_2', - targetIndex: 'no_such_index', - }); - await expect(task()).rejects.toThrow('index_not_found_exception'); - }); - }); - describe('openPit', () => { it('opens PointInTime for an index', async () => { const openPitTask = openPit({ client, index: 'existing_index_with_docs' }); @@ -1372,11 +1377,11 @@ describe('migration actions', () => { }); await expect(task()).resolves.toMatchInlineSnapshot(` - Object { - "_tag": "Right", - "right": "pickup_updated_mappings_succeeded", - } - `); + Object { + "_tag": "Right", + "right": "pickup_updated_mappings_succeeded", + } + `); }); }); @@ -1460,14 +1465,14 @@ describe('migration actions', () => { ], }); await expect(task()).resolves.toMatchInlineSnapshot(` - Object { - "_tag": "Left", - "left": Object { - "index": "no_such_index", - "type": "index_not_found_exception", - }, - } - `); + Object { + "_tag": "Left", + "left": Object { + "index": "no_such_index", + "type": "index_not_found_exception", + }, + } + `); }); describe('with must_exist=false', () => { it('resolves left alias_not_found_exception when alias does not exist', async () => { @@ -1484,13 +1489,13 @@ describe('migration actions', () => { ], }); await expect(task()).resolves.toMatchInlineSnapshot(` - Object { - "_tag": "Left", - "left": Object { - "type": "alias_not_found_exception", - }, - } - `); + Object { + "_tag": "Left", + "left": Object { + "type": "alias_not_found_exception", + }, + } + `); }); }); describe('with must_exist=true', () => { @@ -1508,13 +1513,13 @@ describe('migration actions', () => { ], }); await expect(task()).resolves.toMatchInlineSnapshot(` - Object { - "_tag": "Left", - "left": Object { - "type": "alias_not_found_exception", - }, - } - `); + Object { + "_tag": "Left", + "left": Object { + "type": "alias_not_found_exception", + }, + } + `); }); it('resolves left alias_not_found_exception when alias does not exist', async () => { const task = updateAliases({ @@ -1530,13 +1535,13 @@ describe('migration actions', () => { ], }); await expect(task()).resolves.toMatchInlineSnapshot(` - Object { - "_tag": "Left", - "left": Object { - "type": "alias_not_found_exception", - }, - } - `); + Object { + "_tag": "Left", + "left": Object { + "type": "alias_not_found_exception", + }, + } + `); }); }); }); @@ -1553,14 +1558,14 @@ describe('migration actions', () => { ], }); await expect(task()).resolves.toMatchInlineSnapshot(` - Object { - "_tag": "Left", - "left": Object { - "index": "no_such_index", - "type": "index_not_found_exception", - }, - } - `); + Object { + "_tag": "Left", + "left": Object { + "index": "no_such_index", + "type": "index_not_found_exception", + }, + } + `); }); it('left remove_index_not_a_concrete_index when remove_index targets an alias', async () => { const task = updateAliases({ @@ -1574,13 +1579,13 @@ describe('migration actions', () => { ], }); await expect(task()).resolves.toMatchInlineSnapshot(` - Object { - "_tag": "Left", - "left": Object { - "type": "remove_index_not_a_concrete_index", - }, - } - `); + Object { + "_tag": "Left", + "left": Object { + "type": "remove_index_not_a_concrete_index", + }, + } + `); }); }); }); @@ -1591,9 +1596,24 @@ describe('migration actions', () => { await client.cluster.putSettings({ persistent: { cluster: { max_shards_per_node: null } } }); }); afterAll(async () => { - await client.indices.delete({ index: 'red_then_yellow_index' }); + await client.indices.delete({ index: 'red_then_yellow_index' }).catch(); + await client.indices.delete({ index: 'yellow_then_green_index' }).catch(); + await client.indices.delete({ index: 'create_new_index' }).catch(); }); - it('resolves right after waiting for an index status to be yellow if the index already existed', async () => { + it('resolves right after waiting for an index status to become green when cluster state is not propagated within the timeout', async () => { + // By specifying a very short timeout Elasticsearch will respond before the shard is allocated + const createIndexPromise = createIndex({ + client, + indexName: 'create_new_index', + mappings: undefined as any, + timeout: '1nanos', + })(); + await expect(createIndexPromise).resolves.toEqual({ + _tag: 'Right', + right: 'create_index_succeeded', + }); + }); + it('resolves left if an existing index status does not become green', async () => { expect.assertions(2); // Create a red index await client.indices @@ -1606,7 +1626,7 @@ describe('migration actions', () => { settings: { // Allocate 1 replica so that this index stays yellow number_of_replicas: '1', - // Disable all shard allocation so that the index status is red + // Disable all shard allocation so that the index status starts as red index: { routing: { allocation: { enable: 'none' } } }, }, }, @@ -1629,16 +1649,68 @@ describe('migration actions', () => { client.indices.putSettings({ index: 'red_then_yellow_index', body: { - // Disable all shard allocation so that the index status is red + // Renable allocation so that the status becomes yellow routing: { allocation: { enable: 'all' } }, }, }); indexYellow = true; }, 10); + await createIndexPromise.then((err) => { + // Assert that the promise didn't resolve before the index became yellow + expect(indexYellow).toBe(true); + expect(err).toMatchInlineSnapshot(` + Object { + "_tag": "Left", + "left": Object { + "message": "[index_not_green_timeout] Timeout waiting for the status of the [red_then_yellow_index] index to become 'green'", + "type": "index_not_green_timeout", + }, + } + `); + }); + }); + it('resolves right after waiting for an existing index status to become green', async () => { + expect.assertions(2); + // Create a yellow index + await client.indices + .create({ + index: 'yellow_then_green_index', + timeout: '5s', + body: { + mappings: { properties: {} }, + settings: { + // Allocate 1 replica so that this index stays yellow + number_of_replicas: '1', + }, + }, + }) + .catch((e) => { + /** ignore */ + }); + + // Call createIndex even though the index already exists + const createIndexPromise = createIndex({ + client, + indexName: 'yellow_then_green_index', + mappings: undefined as any, + })(); + let indexGreen = false; + + setTimeout(() => { + client.indices.putSettings({ + index: 'yellow_then_green_index', + body: { + // Set 0 replican so that this index becomes green + number_of_replicas: '0', + }, + }); + indexGreen = true; + }, 10); + await createIndexPromise.then((res) => { // Assert that the promise didn't resolve before the index became green - expect(indexYellow).toBe(true); + expect(indexGreen).toBe(true); expect(res).toMatchInlineSnapshot(` Object { "_tag": "Right", @@ -1652,7 +1724,7 @@ describe('migration actions', () => { await client.cluster.putSettings({ persistent: { cluster: { max_shards_per_node: 1 } } }); const createIndexPromise = createIndex({ client, - indexName: 'red_then_yellow_index_1', + indexName: 'create_index_1', mappings: undefined as any, })(); await expect(createIndexPromise).resolves.toMatchInlineSnapshot(` @@ -1688,10 +1760,10 @@ describe('migration actions', () => { }); await expect(task()).resolves.toMatchInlineSnapshot(` - Object { - "_tag": "Right", - "right": "bulk_index_succeeded", - } + Object { + "_tag": "Right", + "right": "bulk_index_succeeded", + } `); }); it('resolves right even if there were some version_conflict_engine_exception', async () => { diff --git a/src/core/server/saved_objects/migrations/README.md b/src/core/server/saved_objects/migrations/README.md index 03bbb0bc731c44..12d3b2d4905832 100644 --- a/src/core/server/saved_objects/migrations/README.md +++ b/src/core/server/saved_objects/migrations/README.md @@ -149,7 +149,7 @@ index. ### New control state 1. Two conditions have to be met before migrations begin: - 1. The Elasticsearch shard allocation cluster setting `cluster.routing.allocation.enable` needs to be unset or set to 'all'. When set to 'primaries', 'new_primaries' or 'none', the migration will timeout when waiting for index yellow status before bulk indexing because the replica cannot be allocated. + 1. The Elasticsearch shard allocation cluster setting `cluster.routing.allocation.enable` needs to be unset or set to 'all'. When set to 'primaries', 'new_primaries' or 'none', the migration will timeout when waiting for index green status before bulk indexing because the replica cannot be allocated. As per the Elasticsearch docs https://www.elastic.co/guide/en/elasticsearch/reference/8.2/restart-cluster.html#restart-cluster-rolling when Cloud performs a rolling restart such as during an upgrade, it will temporarily disable shard allocation. Kibana therefore keeps retrying the INIT step to wait for shard allocation to be enabled again. @@ -182,12 +182,12 @@ and the migration source index is the index the `.kibana` alias points to. ### Next action `createIndex` -Create the target index. This operation is idempotent, if the index already exist, we wait until its status turns yellow +Create the target index. This operation is idempotent, if the index already exist, we wait until its status turns green ### New control state 1. If the action succeeds → `MARK_VERSION_INDEX_READY` -2. If the action fails with a `index_not_yellow_timeout` +2. If the action fails with a `index_not_green_timeout` → `CREATE_NEW_TARGET` @@ -219,7 +219,7 @@ saved objects index in 7.4 it will be reindexed into `.kibana_pre7.4.0_001`) ### New control state 1. If the index creation succeeds → `LEGACY_REINDEX` -2. If the index creation task failed with a `index_not_yellow_timeout` +2. If the index creation task failed with a `index_not_green_timeout` → `LEGACY_REINDEX_WAIT_FOR_TASK` ## LEGACY_REINDEX ### Next action @@ -261,10 +261,9 @@ new `.kibana` alias that points to `.kibana_pre6.5.0_001`. ## WAIT_FOR_YELLOW_SOURCE ### Next action -`waitForIndexStatusYellow` +`waitForIndexStatus` (status='yellow') -Wait for the Elasticsearch cluster to be in "yellow" state. It means the index's primary shard is allocated and the index is ready for searching/indexing documents, but ES wasn't able to allocate the replicas. -We don't have as much data redundancy as we could have, but it's enough to start the migration. +Wait for the source index to become yellow. This means the index's primary has been allocated and is ready for reading/searching. On a multi node cluster the replicas for this index might not be ready yet but since we're never writing to the source index it does not matter. ### New control state 1. If the action succeeds @@ -285,7 +284,7 @@ Set a write block on the source index to prevent any older Kibana instances from ### Next action `createIndex` -This operation is idempotent, if the index already exist, we wait until its status turns yellow. +This operation is idempotent, if the index already exist, we wait until its status turns green. - Because we will be transforming documents before writing them into this index, we can already set the mappings to the target mappings for this version. The source index might contain documents belonging to a disabled plugin. So set `dynamic: false` mappings for any unknown saved object types. - (Since we never query the temporary index we can potentially disable refresh to speed up indexing performance. Profile to see if gains justify complexity) @@ -293,7 +292,7 @@ This operation is idempotent, if the index already exist, we wait until its stat ### New control state 1. If the action succeeds → `REINDEX_SOURCE_TO_TEMP_OPEN_PIT` -2. If the action fails with a `index_not_yellow_timeout` +2. If the action fails with a `index_not_green_timeout` → `CREATE_REINDEX_TEMP` ## REINDEX_SOURCE_TO_TEMP_OPEN_PIT @@ -368,14 +367,14 @@ Set a write block on the temporary index so that we can clone it. ### Next action `cloneIndex` -Ask elasticsearch to clone the temporary index into the target index. If the target index already exists (because another node already started the clone operation), wait until the clone is complete by waiting for a yellow index status. +Ask elasticsearch to clone the temporary index into the target index. If the target index already exists (because another node already started the clone operation), wait until the clone is complete by waiting for a green index status. We can’t use the temporary index as our target index because one instance can complete the migration, delete a document, and then a second instance starts the reindex operation and re-creates the deleted document. By cloning the temporary index and only accepting writes/deletes from the cloned target index, we prevent lost acknowledged deletes. ### New control state 1. If the action succeeds → `OUTDATED_DOCUMENTS_SEARCH` -2. If the action fails with a `index_not_yellow_timeout` +2. If the action fails with a `index_not_green_timeout` → `CLONE_TEMP_TO_TARGET` ## OUTDATED_DOCUMENTS_SEARCH diff --git a/src/core/server/saved_objects/migrations/actions/check_for_unknown_docs.ts b/src/core/server/saved_objects/migrations/actions/check_for_unknown_docs.ts index a7027c65d011e8..bc101416b9fee4 100644 --- a/src/core/server/saved_objects/migrations/actions/check_for_unknown_docs.ts +++ b/src/core/server/saved_objects/migrations/actions/check_for_unknown_docs.ts @@ -44,8 +44,9 @@ export interface UnknownDocsFound { } /** - * Performs a search in ES, aggregating documents by type, - * retrieving a bunch of documents for each type. + * Performs a search in ES, aggregating documents by type, retrieving a bunch + * of documents for each type. + * * @internal * @param esClient The ES client to perform the search query * @param targetIndices The ES indices to target diff --git a/src/core/server/saved_objects/migrations/actions/clone_index.ts b/src/core/server/saved_objects/migrations/actions/clone_index.ts index 8b0bce96989f27..80b2ff527740cc 100644 --- a/src/core/server/saved_objects/migrations/actions/clone_index.ts +++ b/src/core/server/saved_objects/migrations/actions/clone_index.ts @@ -15,8 +15,8 @@ import { catchRetryableEsClientErrors, RetryableEsClientError, } from './catch_retryable_es_client_errors'; -import type { IndexNotFound, AcknowledgeResponse, IndexNotYellowTimeout } from '.'; -import { waitForIndexStatusYellow } from './wait_for_index_status_yellow'; +import type { IndexNotFound, AcknowledgeResponse } from '.'; +import { type IndexNotGreenTimeout, waitForIndexStatus } from './wait_for_index_status'; import { DEFAULT_TIMEOUT, INDEX_AUTO_EXPAND_REPLICAS, @@ -52,7 +52,7 @@ export const cloneIndex = ({ target, timeout = DEFAULT_TIMEOUT, }: CloneIndexParams): TaskEither.TaskEither< - RetryableEsClientError | IndexNotFound | IndexNotYellowTimeout | ClusterShardLimitExceeded, + RetryableEsClientError | IndexNotFound | IndexNotGreenTimeout | ClusterShardLimitExceeded, CloneIndexResponse > => { const cloneTask: TaskEither.TaskEither< @@ -60,32 +60,29 @@ export const cloneIndex = ({ AcknowledgeResponse > = () => { return client.indices - .clone( - { - index: source, - target, - wait_for_active_shards: WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE, - body: { - settings: { - index: { - // The source we're cloning from will have a write block set, so - // we need to remove it to allow writes to our newly cloned index - 'blocks.write': false, - number_of_shards: INDEX_NUMBER_OF_SHARDS, - auto_expand_replicas: INDEX_AUTO_EXPAND_REPLICAS, - // Set an explicit refresh interval so that we don't inherit the - // value from incorrectly configured index templates (not required - // after we adopt system indices) - refresh_interval: '1s', - // Bump priority so that recovery happens before newer indices - priority: 10, - }, + .clone({ + index: source, + target, + wait_for_active_shards: WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE, + body: { + settings: { + index: { + // The source we're cloning from will have a write block set, so + // we need to remove it to allow writes to our newly cloned index + 'blocks.write': false, + number_of_shards: INDEX_NUMBER_OF_SHARDS, + auto_expand_replicas: INDEX_AUTO_EXPAND_REPLICAS, + // Set an explicit refresh interval so that we don't inherit the + // value from incorrectly configured index templates (not required + // after we adopt system indices) + refresh_interval: '1s', + // Bump priority so that recovery happens before newer indices + priority: 10, }, }, - timeout, }, - { maxRetries: 0 /** handle retry ourselves for now */ } - ) + timeout, + }) .then((response) => { /** * - acknowledged=false, we timed out before the cluster state was @@ -136,7 +133,7 @@ export const cloneIndex = ({ } else { // Otherwise, wait until the target index has a 'yellow' status. return pipe( - waitForIndexStatusYellow({ client, index: target, timeout }), + waitForIndexStatus({ client, index: target, timeout, status: 'green' }), TaskEither.map((value) => { /** When the index status is 'yellow' we know that all shards were started */ return { acknowledged: true, shardsAcknowledged: true }; diff --git a/src/core/server/saved_objects/migrations/actions/constants.ts b/src/core/server/saved_objects/migrations/actions/constants.ts index 5d0d2ffe5d695b..536ae1d2569601 100644 --- a/src/core/server/saved_objects/migrations/actions/constants.ts +++ b/src/core/server/saved_objects/migrations/actions/constants.ts @@ -11,6 +11,16 @@ * Uses the default value of 1000 for Elasticsearch reindex operation. */ export const BATCH_SIZE = 1_000; +/** + * When a request takes a long time to complete and hits the timeout or the + * client aborts that request due to the requestTimeout, our only course of + * action is to retry that request. This places our request at the end of the + * queue and adds more load to Elasticsearch just making things worse. + * + * So we want to choose as long a timeout as possible. Some load balancers / + * reverse proxies like ELB ignore TCP keep-alive packets so unless there's a + * request or response sent over the socket it will be dropped after 60s. + */ export const DEFAULT_TIMEOUT = '60s'; /** Allocate 1 replica if there are enough data nodes, otherwise continue with 0 */ export const INDEX_AUTO_EXPAND_REPLICAS = '0-1'; diff --git a/src/core/server/saved_objects/migrations/actions/create_index.ts b/src/core/server/saved_objects/migrations/actions/create_index.ts index 3436845f823820..41ee20fc9562db 100644 --- a/src/core/server/saved_objects/migrations/actions/create_index.ts +++ b/src/core/server/saved_objects/migrations/actions/create_index.ts @@ -22,7 +22,7 @@ import { INDEX_AUTO_EXPAND_REPLICAS, WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE, } from './constants'; -import { IndexNotYellowTimeout, waitForIndexStatusYellow } from './wait_for_index_status_yellow'; +import { type IndexNotGreenTimeout, waitForIndexStatus } from './wait_for_index_status'; import { isClusterShardLimitExceeded } from './es_errors'; function aliasArrayToRecord(aliases: string[]): Record { @@ -44,6 +44,7 @@ export interface CreateIndexParams { indexName: string; mappings: IndexMapping; aliases?: string[]; + timeout?: string; } /** * Creates an index with the given mappings @@ -60,8 +61,9 @@ export const createIndex = ({ indexName, mappings, aliases = [], + timeout = DEFAULT_TIMEOUT, }: CreateIndexParams): TaskEither.TaskEither< - RetryableEsClientError | IndexNotYellowTimeout | ClusterShardLimitExceeded, + RetryableEsClientError | IndexNotGreenTimeout | ClusterShardLimitExceeded, 'create_index_succeeded' > => { const createIndexTask: TaskEither.TaskEither< @@ -71,36 +73,34 @@ export const createIndex = ({ const aliasesObject = aliasArrayToRecord(aliases); return client.indices - .create( - { - index: indexName, - // wait until all shards are available before creating the index - // (since number_of_shards=1 this does not have any effect atm) - wait_for_active_shards: WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE, - // Wait up to 60s for the cluster state to update and all shards to be - // started - timeout: DEFAULT_TIMEOUT, - body: { - mappings, - aliases: aliasesObject, - settings: { - index: { - // ES rule of thumb: shards should be several GB to 10's of GB, so - // Kibana is unlikely to cross that limit. - number_of_shards: 1, - auto_expand_replicas: INDEX_AUTO_EXPAND_REPLICAS, - // Set an explicit refresh interval so that we don't inherit the - // value from incorrectly configured index templates (not required - // after we adopt system indices) - refresh_interval: '1s', - // Bump priority so that recovery happens before newer indices - priority: 10, - }, + .create({ + index: indexName, + // wait up to timeout until the following shards are available before + // creating the index: primary, replica (only on multi node clusters) + wait_for_active_shards: WAIT_FOR_ALL_SHARDS_TO_BE_ACTIVE, + // Timeout for the cluster state to update and all shards to become + // available. If the request doesn't complete within timeout, + // acknowledged or shards_acknowledged would be false. + timeout, + body: { + mappings, + aliases: aliasesObject, + settings: { + index: { + // ES rule of thumb: shards should be several GB to 10's of GB, so + // Kibana is unlikely to cross that limit. + number_of_shards: 1, + auto_expand_replicas: INDEX_AUTO_EXPAND_REPLICAS, + // Set an explicit refresh interval so that we don't inherit the + // value from incorrectly configured index templates (not required + // after we adopt system indices) + refresh_interval: '1s', + // Bump priority so that recovery happens before newer indices + priority: 10, }, }, }, - { maxRetries: 0 /** handle retry ourselves for now */ } - ) + }) .then((res) => { /** * - acknowledged=false, we timed out before the cluster state was @@ -140,19 +140,25 @@ export const createIndex = ({ return pipe( createIndexTask, TaskEither.chain< - RetryableEsClientError | IndexNotYellowTimeout | ClusterShardLimitExceeded, + RetryableEsClientError | IndexNotGreenTimeout | ClusterShardLimitExceeded, AcknowledgeResponse, 'create_index_succeeded' >((res) => { if (res.acknowledged && res.shardsAcknowledged) { - // If the cluster state was updated and all shards ackd we're done + // If the cluster state was updated and all shards started we're done return TaskEither.right('create_index_succeeded'); } else { - // Otherwise, wait until the target index has a 'yellow' status. + // Otherwise, wait until the target index has a 'green' status meaning + // the primary (and on multi node clusters) the replica has been started return pipe( - waitForIndexStatusYellow({ client, index: indexName, timeout: DEFAULT_TIMEOUT }), + waitForIndexStatus({ + client, + index: indexName, + timeout: DEFAULT_TIMEOUT, + status: 'green', + }), TaskEither.map(() => { - /** When the index status is 'yellow' we know that all shards were started */ + /** When the index status is 'green' we know that all shards were started */ return 'create_index_succeeded'; }) ); diff --git a/src/core/server/saved_objects/migrations/actions/fetch_indices.ts b/src/core/server/saved_objects/migrations/actions/fetch_indices.ts index a88d610a902215..922797f2ba2681 100644 --- a/src/core/server/saved_objects/migrations/actions/fetch_indices.ts +++ b/src/core/server/saved_objects/migrations/actions/fetch_indices.ts @@ -34,18 +34,14 @@ export const fetchIndices = client, indices, }: FetchIndicesParams): TaskEither.TaskEither => - // @ts-expect-error @elastic/elasticsearch IndexState.alias and IndexState.mappings should be required () => { return client.indices - .get( - { - index: indices, - ignore_unavailable: true, // Don't return an error for missing indices. Note this *will* include closed indices, the docs are misleading https://github.com/elastic/elasticsearch/issues/63607 - }, - { maxRetries: 0 } - ) + .get({ + index: indices, + ignore_unavailable: true, // Don't return an error for missing indices. Note this *will* include closed indices, the docs are misleading https://github.com/elastic/elasticsearch/issues/63607 + }) .then((body) => { - return Either.right(body); + return Either.right(body as FetchIndexResponse); }) .catch(catchRetryableEsClientErrors); }; diff --git a/src/core/server/saved_objects/migrations/actions/index.ts b/src/core/server/saved_objects/migrations/actions/index.ts index 4ac6bfa24fee69..2b6d501f787b09 100644 --- a/src/core/server/saved_objects/migrations/actions/index.ts +++ b/src/core/server/saved_objects/migrations/actions/index.ts @@ -35,11 +35,12 @@ export { removeWriteBlock } from './remove_write_block'; export type { CloneIndexResponse, CloneIndexParams } from './clone_index'; export { cloneIndex } from './clone_index'; -export type { - WaitForIndexStatusYellowParams, - IndexNotYellowTimeout, -} from './wait_for_index_status_yellow'; -import { IndexNotYellowTimeout, waitForIndexStatusYellow } from './wait_for_index_status_yellow'; +export type { WaitForIndexStatusParams, IndexNotYellowTimeout } from './wait_for_index_status'; +import { + type IndexNotGreenTimeout, + type IndexNotYellowTimeout, + waitForIndexStatus, +} from './wait_for_index_status'; export type { WaitForTaskResponse, WaitForTaskCompletionTimeout } from './wait_for_task'; import { waitForTask, WaitForTaskCompletionTimeout } from './wait_for_task'; @@ -48,7 +49,7 @@ export type { UpdateByQueryResponse } from './pickup_updated_mappings'; import { pickupUpdatedMappings } from './pickup_updated_mappings'; export type { OpenPitResponse, OpenPitParams } from './open_pit'; -export { openPit, pitKeepAlive } from './open_pit'; +export { openPit } from './open_pit'; export type { ReadWithPit, ReadWithPitParams } from './read_with_pit'; export { readWithPit } from './read_with_pit'; @@ -69,9 +70,6 @@ import type { IncompatibleMappingException } from './wait_for_reindex_task'; export { waitForReindexTask } from './wait_for_reindex_task'; -export type { VerifyReindexParams } from './verify_reindex'; -export { verifyReindex } from './verify_reindex'; - import type { AliasNotFound, RemoveIndexNotAConcreteIndex } from './update_aliases'; export type { AliasAction, UpdateAliasesParams } from './update_aliases'; @@ -114,7 +112,7 @@ export type { } from './calculate_exclude_filters'; export { calculateExcludeFilters } from './calculate_exclude_filters'; -export { pickupUpdatedMappings, waitForTask, waitForIndexStatusYellow }; +export { pickupUpdatedMappings, waitForTask, waitForIndexStatus }; export type { AliasNotFound, RemoveIndexNotAConcreteIndex }; export interface IndexNotFound { @@ -153,6 +151,7 @@ export interface ActionErrorTypeMap { request_entity_too_large_exception: RequestEntityTooLargeException; unknown_docs_found: UnknownDocsFound; incompatible_cluster_routing_allocation: IncompatibleClusterRoutingAllocation; + index_not_green_timeout: IndexNotGreenTimeout; index_not_yellow_timeout: IndexNotYellowTimeout; cluster_shard_limit_exceeded: ClusterShardLimitExceeded; } diff --git a/src/core/server/saved_objects/migrations/actions/open_pit.ts b/src/core/server/saved_objects/migrations/actions/open_pit.ts index c17b42d13a8c48..3966198393c219 100644 --- a/src/core/server/saved_objects/migrations/actions/open_pit.ts +++ b/src/core/server/saved_objects/migrations/actions/open_pit.ts @@ -25,7 +25,7 @@ export interface OpenPitParams { index: string; } // how long ES should keep PIT alive -export const pitKeepAlive = '10m'; +export const DEFAULT_PIT_KEEP_ALIVE = '10m'; /* * Creates a lightweight view of data when the request has been initiated. * See https://www.elastic.co/guide/en/elasticsearch/reference/current/point-in-time-api.html @@ -39,7 +39,7 @@ export const openPit = return client .openPointInTime({ index, - keep_alive: pitKeepAlive, + keep_alive: DEFAULT_PIT_KEEP_ALIVE, }) .then((response) => Either.right({ pitId: response.id })) .catch(catchRetryableEsClientErrors); diff --git a/src/core/server/saved_objects/migrations/actions/read_with_pit.ts b/src/core/server/saved_objects/migrations/actions/read_with_pit.ts index 10d5ff6bfff886..91e12ddc33c236 100644 --- a/src/core/server/saved_objects/migrations/actions/read_with_pit.ts +++ b/src/core/server/saved_objects/migrations/actions/read_with_pit.ts @@ -15,7 +15,7 @@ import { catchRetryableEsClientErrors, RetryableEsClientError, } from './catch_retryable_es_client_errors'; -import { pitKeepAlive } from './open_pit'; +import { DEFAULT_PIT_KEEP_ALIVE } from './open_pit'; /** @internal */ export interface ReadWithPit { @@ -49,11 +49,18 @@ export const readWithPit = () => { return client .search({ - allow_partial_search_results: false, seq_no_primary_term: seqNoPrimaryTerm, - // Sort fields are required to use searchAfter + // Fail if the index being searched doesn't exist or is closed + // allow_no_indices: false, + // By default ES returns a 200 with partial results if there are shard + // request timeouts or shard failures which can lead to data loss for + // migrations + allow_partial_search_results: false, + // Sort fields are required to use searchAfter so we sort by the + // natural order of the index which is the most efficient option + // as order is not important for the migration sort: '_shard_doc:asc', - pit: { id: pitId, keep_alive: pitKeepAlive }, + pit: { id: pitId, keep_alive: DEFAULT_PIT_KEEP_ALIVE }, size: batchSize, search_after: searchAfter, /** diff --git a/src/core/server/saved_objects/migrations/actions/remove_write_block.ts b/src/core/server/saved_objects/migrations/actions/remove_write_block.ts index e5c64c8385e910..d4e4ad4b8e7c86 100644 --- a/src/core/server/saved_objects/migrations/actions/remove_write_block.ts +++ b/src/core/server/saved_objects/migrations/actions/remove_write_block.ts @@ -33,19 +33,16 @@ export const removeWriteBlock = > => () => { return client.indices - .putSettings( - { - index, - // Don't change any existing settings - preserve_existing: true, - body: { - blocks: { - write: false, - }, + .putSettings({ + index, + // Don't change any existing settings + preserve_existing: true, + body: { + blocks: { + write: false, }, }, - { maxRetries: 0 /** handle retry ourselves for now */ } - ) + }) .then((res) => { return res.acknowledged === true ? Either.right('remove_write_block_succeeded' as const) diff --git a/src/core/server/saved_objects/migrations/actions/update_aliases.ts b/src/core/server/saved_objects/migrations/actions/update_aliases.ts index 1a5e487ce9205c..5843599e55afcc 100644 --- a/src/core/server/saved_objects/migrations/actions/update_aliases.ts +++ b/src/core/server/saved_objects/migrations/actions/update_aliases.ts @@ -49,14 +49,11 @@ export const updateAliases = > => () => { return client.indices - .updateAliases( - { - body: { - actions: aliasActions, - }, + .updateAliases({ + body: { + actions: aliasActions, }, - { maxRetries: 0 } - ) + }) .then(() => { // Ignore `acknowledged: false`. When the coordinating node accepts // the new cluster state update but not all nodes have applied the diff --git a/src/core/server/saved_objects/migrations/actions/verify_reindex.ts b/src/core/server/saved_objects/migrations/actions/verify_reindex.ts deleted file mode 100644 index 866ec9974e929d..00000000000000 --- a/src/core/server/saved_objects/migrations/actions/verify_reindex.ts +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0 and the Server Side Public License, v 1; you may not use this file except - * in compliance with, at your election, the Elastic License 2.0 or the Server - * Side Public License, v 1. - */ - -import * as Either from 'fp-ts/lib/Either'; -import * as TaskEither from 'fp-ts/lib/TaskEither'; -import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; -import { - catchRetryableEsClientErrors, - RetryableEsClientError, -} from './catch_retryable_es_client_errors'; - -/** @internal */ -export interface VerifyReindexParams { - client: ElasticsearchClient; - sourceIndex: string; - targetIndex: string; -} - -export const verifyReindex = - ({ - client, - sourceIndex, - targetIndex, - }: VerifyReindexParams): TaskEither.TaskEither< - RetryableEsClientError | { type: 'verify_reindex_failed' }, - 'verify_reindex_succeeded' - > => - () => { - const count = (index: string) => - client - .count({ - index, - // Return an error when targeting missing or closed indices - allow_no_indices: false, - }) - .then((res) => { - return res.count; - }); - - return Promise.all([count(sourceIndex), count(targetIndex)]) - .then(([sourceCount, targetCount]) => { - if (targetCount >= sourceCount) { - return Either.right('verify_reindex_succeeded' as const); - } else { - return Either.left({ type: 'verify_reindex_failed' as const }); - } - }) - .catch(catchRetryableEsClientErrors); - }; diff --git a/src/core/server/saved_objects/migrations/actions/wait_for_index_status_yellow.test.ts b/src/core/server/saved_objects/migrations/actions/wait_for_index_status.test.ts similarity index 89% rename from src/core/server/saved_objects/migrations/actions/wait_for_index_status_yellow.test.ts rename to src/core/server/saved_objects/migrations/actions/wait_for_index_status.test.ts index ecff30c595a78c..3a4968be27aa83 100644 --- a/src/core/server/saved_objects/migrations/actions/wait_for_index_status_yellow.test.ts +++ b/src/core/server/saved_objects/migrations/actions/wait_for_index_status.test.ts @@ -7,13 +7,13 @@ */ import { errors as EsErrors } from '@elastic/elasticsearch'; -import { waitForIndexStatusYellow } from './wait_for_index_status_yellow'; +import { waitForIndexStatus } from './wait_for_index_status'; import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-mocks'; import { catchRetryableEsClientErrors } from './catch_retryable_es_client_errors'; jest.mock('./catch_retryable_es_client_errors'); -describe('waitForIndexStatusYellow', () => { +describe('waitForIndexStatus', () => { beforeEach(() => { jest.clearAllMocks(); }); @@ -31,9 +31,10 @@ describe('waitForIndexStatusYellow', () => { ); it('calls catchRetryableEsClientErrors when the promise rejects', async () => { - const task = waitForIndexStatusYellow({ + const task = waitForIndexStatus({ client, index: 'my_index', + status: 'yellow', }); try { await task(); diff --git a/src/core/server/saved_objects/migrations/actions/wait_for_index_status.ts b/src/core/server/saved_objects/migrations/actions/wait_for_index_status.ts new file mode 100644 index 00000000000000..7ee63e75838511 --- /dev/null +++ b/src/core/server/saved_objects/migrations/actions/wait_for_index_status.ts @@ -0,0 +1,103 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import * as Either from 'fp-ts/lib/Either'; +import * as TaskEither from 'fp-ts/lib/TaskEither'; +import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; +import { + catchRetryableEsClientErrors, + RetryableEsClientError, +} from './catch_retryable_es_client_errors'; +import { DEFAULT_TIMEOUT } from './constants'; + +/** @internal */ +export interface WaitForIndexStatusParams { + client: ElasticsearchClient; + index: string; + timeout?: string; + status: 'yellow' | 'green'; +} + +export interface IndexNotYellowTimeout { + type: 'index_not_yellow_timeout'; + message: string; +} + +export interface IndexNotGreenTimeout { + type: 'index_not_green_timeout'; + message: string; +} + +export function waitForIndexStatus({ + client, + index, + timeout, + status, +}: WaitForIndexStatusParams & { status: 'yellow' }): TaskEither.TaskEither< + RetryableEsClientError | IndexNotYellowTimeout, + {} +>; + +export function waitForIndexStatus({ + client, + index, + timeout, + status, +}: WaitForIndexStatusParams & { status: 'green' }): TaskEither.TaskEither< + RetryableEsClientError | IndexNotGreenTimeout, + {} +>; + +/** + * Wait until an index status become either 'yellow' or 'green'. + * + * A yellow index status means the index's primary shard was allocated but ES + * wasn't able to allocate the replica. Thus a yellow index can be searched + * and read from but indexing documents with `wait_for_active_shards='all'` + * will fail. + * + * A green index status means the index's primary and replica shards has been + * allocated so we can search, read and index documents with + * `wait_for_active_shards='all'`. + */ +export function waitForIndexStatus({ + client, + index, + timeout = DEFAULT_TIMEOUT, + status, +}: WaitForIndexStatusParams): TaskEither.TaskEither< + RetryableEsClientError | IndexNotYellowTimeout | IndexNotGreenTimeout, + {} +> { + return () => { + return client.cluster + .health( + { + index, + wait_for_status: status, + timeout, + }, + { + /* Don't reject on status code 408 so that we can handle the timeout + * explicitly with a custom response type and provide more context in the error message + */ + ignore: [408], + } + ) + .then((res) => { + if (res.timed_out === true) { + return Either.left({ + type: `index_not_${status}_timeout` as const, + message: `[index_not_${status}_timeout] Timeout waiting for the status of the [${index}] index to become '${status}'`, + }); + } + return Either.right({}); + }) + .catch(catchRetryableEsClientErrors); + }; +} diff --git a/src/core/server/saved_objects/migrations/actions/wait_for_index_status_yellow.ts b/src/core/server/saved_objects/migrations/actions/wait_for_index_status_yellow.ts deleted file mode 100644 index a306c0d2d058c2..00000000000000 --- a/src/core/server/saved_objects/migrations/actions/wait_for_index_status_yellow.ts +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0 and the Server Side Public License, v 1; you may not use this file except - * in compliance with, at your election, the Elastic License 2.0 or the Server - * Side Public License, v 1. - */ - -import * as Either from 'fp-ts/lib/Either'; -import * as TaskEither from 'fp-ts/lib/TaskEither'; -import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; -import { - catchRetryableEsClientErrors, - RetryableEsClientError, -} from './catch_retryable_es_client_errors'; -import { DEFAULT_TIMEOUT } from './constants'; - -/** @internal */ -export interface WaitForIndexStatusYellowParams { - client: ElasticsearchClient; - index: string; - timeout?: string; -} - -export interface IndexNotYellowTimeout { - type: 'index_not_yellow_timeout'; - message: string; -} -/** - * A yellow index status means the index's primary shard is allocated and the - * index is ready for searching/indexing documents, but ES wasn't able to - * allocate the replicas. When migrations proceed with a yellow index it means - * we don't have as much data-redundancy as we could have, but waiting for - * replicas would mean that v2 migrations fail where v1 migrations would have - * succeeded. It doesn't feel like it's Kibana's job to force users to keep - * their clusters green and even if it's green when we migrate it can turn - * yellow at any point in the future. So ultimately data-redundancy is up to - * users to maintain. - */ -export const waitForIndexStatusYellow = - ({ - client, - index, - timeout = DEFAULT_TIMEOUT, - }: WaitForIndexStatusYellowParams): TaskEither.TaskEither< - RetryableEsClientError | IndexNotYellowTimeout, - {} - > => - () => { - return client.cluster - .health( - { - index, - wait_for_status: 'yellow', - timeout, - }, - // Don't reject on status code 408 so that we can handle the timeout - // explicitly with a custom response type and provide more context in the error message - { ignore: [408] } - ) - .then((res) => { - if (res.timed_out === true) { - return Either.left({ - type: 'index_not_yellow_timeout' as const, - message: `[index_not_yellow_timeout] Timeout waiting for the status of the [${index}] index to become 'yellow'`, - }); - } - return Either.right({}); - }) - .catch(catchRetryableEsClientErrors); - }; diff --git a/src/core/server/saved_objects/migrations/kibana_migrator.test.ts b/src/core/server/saved_objects/migrations/kibana_migrator.test.ts index 926b6efcf18037..df6ddb9624abeb 100644 --- a/src/core/server/saved_objects/migrations/kibana_migrator.test.ts +++ b/src/core/server/saved_objects/migrations/kibana_migrator.test.ts @@ -247,6 +247,9 @@ const mockV2MigrationOptions = () => { }; const mockOptions = () => { + const mockedClient = elasticsearchClientMock.createElasticsearchClient(); + (mockedClient as any).child = jest.fn().mockImplementation(() => mockedClient); + const options: MockedOptions = { logger: loggingSystemMock.create().get(), kibanaVersion: '8.2.3', @@ -284,7 +287,7 @@ const mockOptions = () => { skip: false, retryAttempts: 20, }, - client: elasticsearchClientMock.createElasticsearchClient(), + client: mockedClient, docLinks: docLinksServiceMock.createSetupContract(), }; return options; diff --git a/src/core/server/saved_objects/migrations/model/model.test.ts b/src/core/server/saved_objects/migrations/model/model.test.ts index b61fdc2c8ec8c7..eade744a004694 100644 --- a/src/core/server/saved_objects/migrations/model/model.test.ts +++ b/src/core/server/saved_objects/migrations/model/model.test.ts @@ -596,10 +596,10 @@ describe('migrations v2 model', () => { expect(newState.retryCount).toEqual(0); expect(newState.retryDelay).toEqual(0); }); - test('LEGACY_CREATE_REINDEX_TARGET -> LEGACY_CREATE_REINDEX_TARGET if action fails with index_not_yellow_timeout', () => { + test('LEGACY_CREATE_REINDEX_TARGET -> LEGACY_CREATE_REINDEX_TARGET if action fails with index_not_green_timeout', () => { const res: ResponseType<'LEGACY_CREATE_REINDEX_TARGET'> = Either.left({ - message: '[index_not_yellow_timeout] Timeout waiting for ...', - type: 'index_not_yellow_timeout', + message: '[index_not_green_timeout] Timeout waiting for ...', + type: 'index_not_green_timeout', }); const newState = model(legacyCreateReindexTargetState, res); expect(newState.controlState).toEqual('LEGACY_CREATE_REINDEX_TARGET'); @@ -608,7 +608,7 @@ describe('migrations v2 model', () => { expect(newState.logs[0]).toMatchInlineSnapshot(` Object { "level": "error", - "message": "Action failed with '[index_not_yellow_timeout] Timeout waiting for ... Refer to repeatedTimeoutRequests for information on how to resolve the issue.'. Retrying attempt 1 in 2 seconds.", + "message": "Action failed with '[index_not_green_timeout] Timeout waiting for ... Refer to repeatedTimeoutRequests for information on how to resolve the issue.'. Retrying attempt 1 in 2 seconds.", } `); }); @@ -1049,10 +1049,10 @@ describe('migrations v2 model', () => { expect(newState.retryCount).toEqual(0); expect(newState.retryDelay).toEqual(0); }); - it('CREATE_REINDEX_TEMP -> CREATE_REINDEX_TEMP if action fails with index_not_yellow_timeout', () => { + it('CREATE_REINDEX_TEMP -> CREATE_REINDEX_TEMP if action fails with index_not_green_timeout', () => { const res: ResponseType<'CREATE_REINDEX_TEMP'> = Either.left({ - message: '[index_not_yellow_timeout] Timeout waiting for ...', - type: 'index_not_yellow_timeout', + message: '[index_not_green_timeout] Timeout waiting for ...', + type: 'index_not_green_timeout', }); const newState = model(state, res); expect(newState.controlState).toEqual('CREATE_REINDEX_TEMP'); @@ -1061,7 +1061,7 @@ describe('migrations v2 model', () => { expect(newState.logs[0]).toMatchInlineSnapshot(` Object { "level": "error", - "message": "Action failed with '[index_not_yellow_timeout] Timeout waiting for ... Refer to repeatedTimeoutRequests for information on how to resolve the issue.'. Retrying attempt 1 in 2 seconds.", + "message": "Action failed with '[index_not_green_timeout] Timeout waiting for ... Refer to repeatedTimeoutRequests for information on how to resolve the issue.'. Retrying attempt 1 in 2 seconds.", } `); }); @@ -1434,10 +1434,10 @@ describe('migrations v2 model', () => { expect(newState.retryCount).toBe(0); expect(newState.retryDelay).toBe(0); }); - it('CLONE_TEMP_TO_TARGET -> CLONE_TEMP_TO_TARGET if action fails with index_not_yellow_timeout', () => { + it('CLONE_TEMP_TO_TARGET -> CLONE_TEMP_TO_TARGET if action fails with index_not_green_timeout', () => { const res: ResponseType<'CLONE_TEMP_TO_TARGET'> = Either.left({ - message: '[index_not_yellow_timeout] Timeout waiting for ...', - type: 'index_not_yellow_timeout', + message: '[index_not_green_timeout] Timeout waiting for ...', + type: 'index_not_green_timeout', }); const newState = model(state, res); expect(newState.controlState).toEqual('CLONE_TEMP_TO_TARGET'); @@ -1446,7 +1446,7 @@ describe('migrations v2 model', () => { expect(newState.logs[0]).toMatchInlineSnapshot(` Object { "level": "error", - "message": "Action failed with '[index_not_yellow_timeout] Timeout waiting for ... Refer to repeatedTimeoutRequests for information on how to resolve the issue.'. Retrying attempt 1 in 2 seconds.", + "message": "Action failed with '[index_not_green_timeout] Timeout waiting for ... Refer to repeatedTimeoutRequests for information on how to resolve the issue.'. Retrying attempt 1 in 2 seconds.", } `); }); @@ -1963,10 +1963,10 @@ describe('migrations v2 model', () => { expect(newState.retryCount).toEqual(0); expect(newState.retryDelay).toEqual(0); }); - test('CREATE_NEW_TARGET -> CREATE_NEW_TARGET if action fails with index_not_yellow_timeout', () => { + test('CREATE_NEW_TARGET -> CREATE_NEW_TARGET if action fails with index_not_green_timeout', () => { const res: ResponseType<'CREATE_NEW_TARGET'> = Either.left({ - message: '[index_not_yellow_timeout] Timeout waiting for ...', - type: 'index_not_yellow_timeout', + message: '[index_not_green_timeout] Timeout waiting for ...', + type: 'index_not_green_timeout', }); const newState = model(createNewTargetState, res); expect(newState.controlState).toEqual('CREATE_NEW_TARGET'); diff --git a/src/core/server/saved_objects/migrations/model/model.ts b/src/core/server/saved_objects/migrations/model/model.ts index 9ab668b547e21b..8eca6d12c4f0ad 100644 --- a/src/core/server/saved_objects/migrations/model/model.ts +++ b/src/core/server/saved_objects/migrations/model/model.ts @@ -241,8 +241,8 @@ export const model = (currentState: State, resW: ResponseType): const res = resW as ExcludeRetryableEsError>; if (Either.isLeft(res)) { const left = res.left; - if (isTypeof(left, 'index_not_yellow_timeout')) { - // `index_not_yellow_timeout` for the LEGACY_CREATE_REINDEX_TARGET source index: + if (isTypeof(left, 'index_not_green_timeout')) { + // `index_not_green_timeout` for the LEGACY_CREATE_REINDEX_TARGET source index: // A yellow status timeout could theoretically be temporary for a busy cluster // that takes a long time to allocate the primary and we retry the action to see if // we get a response. @@ -485,10 +485,10 @@ export const model = (currentState: State, resW: ResponseType): return { ...stateP, controlState: 'REINDEX_SOURCE_TO_TEMP_OPEN_PIT' }; } else if (Either.isLeft(res)) { const left = res.left; - if (isTypeof(left, 'index_not_yellow_timeout')) { - // `index_not_yellow_timeout` for the CREATE_REINDEX_TEMP target temp index: - // The index status did not go yellow within the specified timeout period. - // A yellow status timeout could theoretically be temporary for a busy cluster. + if (isTypeof(left, 'index_not_green_timeout')) { + // `index_not_green_timeout` for the CREATE_REINDEX_TEMP target temp index: + // The index status did not go green within the specified timeout period. + // A green status timeout could theoretically be temporary for a busy cluster. // // If there is a problem CREATE_REINDEX_TEMP action will // continue to timeout and eventually lead to a failed migration. @@ -753,9 +753,9 @@ export const model = (currentState: State, resW: ResponseType): ...stateP, controlState: 'REFRESH_TARGET', }; - } else if (isTypeof(left, 'index_not_yellow_timeout')) { - // `index_not_yellow_timeout` for the CLONE_TEMP_TO_TARGET source -> target index: - // The target index status did not go yellow within the specified timeout period. + } else if (isTypeof(left, 'index_not_green_timeout')) { + // `index_not_green_timeout` for the CLONE_TEMP_TO_TARGET source -> target index: + // The target index status did not go green within the specified timeout period. // The cluster could just be busy and we retry the action. // Once we run out of retries, the migration fails. @@ -1019,8 +1019,8 @@ export const model = (currentState: State, resW: ResponseType): }; } else if (Either.isLeft(res)) { const left = res.left; - if (isTypeof(left, 'index_not_yellow_timeout')) { - // `index_not_yellow_timeout` for the CREATE_NEW_TARGET target index: + if (isTypeof(left, 'index_not_green_timeout')) { + // `index_not_green_timeout` for the CREATE_NEW_TARGET target index: // The cluster might just be busy and we retry the action for a set number of times. // If the cluster hit the low watermark for disk usage the action will continue to timeout. // Unless the disk space is addressed, the LEGACY_CREATE_REINDEX_TARGET action will diff --git a/src/core/server/saved_objects/migrations/next.ts b/src/core/server/saved_objects/migrations/next.ts index 7d73d4830259bf..9ac29a3a849ba7 100644 --- a/src/core/server/saved_objects/migrations/next.ts +++ b/src/core/server/saved_objects/migrations/next.ts @@ -61,7 +61,7 @@ export const nextActionMap = (client: ElasticsearchClient, transformRawDocs: Tra INIT: (state: InitState) => Actions.initAction({ client, indices: [state.currentAlias, state.versionAlias] }), WAIT_FOR_YELLOW_SOURCE: (state: WaitForYellowSourceState) => - Actions.waitForIndexStatusYellow({ client, index: state.sourceIndex.value }), + Actions.waitForIndexStatus({ client, index: state.sourceIndex.value, status: 'yellow' }), CHECK_UNKNOWN_DOCUMENTS: (state: CheckUnknownDocumentsState) => Actions.checkForUnknownDocs({ client, diff --git a/src/core/server/saved_objects/migrations/run_resilient_migrator.ts b/src/core/server/saved_objects/migrations/run_resilient_migrator.ts index 337947c33b989c..ce8f9d921e2c65 100644 --- a/src/core/server/saved_objects/migrations/run_resilient_migrator.ts +++ b/src/core/server/saved_objects/migrations/run_resilient_migrator.ts @@ -20,6 +20,20 @@ import { migrationStateActionMachine } from './migrations_state_action_machine'; import { SavedObjectsMigrationConfigType } from '../saved_objects_config'; import type { ISavedObjectTypeRegistry } from '../saved_objects_type_registry'; +/** + * To avoid the Elasticsearch-js client aborting our requests before we + * receive a response from Elasticsearch we choose a requestTimeout that's + * longer than the DEFAULT_TIMEOUT. + * + * This timeout is only really valuable for preventing migrations from being + * stuck waiting forever for a response when the underlying socket is broken. + * + * We also set maxRetries to 0 so that the state action machine can handle all + * retries. This way we get exponential back-off and logging for failed + * actions. + */ +export const MIGRATION_CLIENT_OPTIONS = { maxRetries: 0, requestTimeout: 120_000 }; + /** * Migrates the provided indexPrefix index using a resilient algorithm that is * completely lock-free so that any failure can always be retried by @@ -61,11 +75,12 @@ export async function runResilientMigrator({ docLinks, logger, }); + const migrationClient = client.child(MIGRATION_CLIENT_OPTIONS); return migrationStateActionMachine({ initialState, logger, - next: next(client, transformRawDocs), + next: next(migrationClient, transformRawDocs), model, - client, + client: migrationClient, }); } diff --git a/src/core/server/saved_objects/migrations/state.ts b/src/core/server/saved_objects/migrations/state.ts index 02da8495c0d8af..c6f3273a18d47a 100644 --- a/src/core/server/saved_objects/migrations/state.ts +++ b/src/core/server/saved_objects/migrations/state.ts @@ -182,7 +182,7 @@ export interface FatalState extends BaseState { } export interface WaitForYellowSourceState extends BaseState { - /** Wait for the source index to be yellow before requesting it. */ + /** Wait for the source index to be yellow before reading from it. */ readonly controlState: 'WAIT_FOR_YELLOW_SOURCE'; readonly sourceIndex: Option.Some; readonly sourceIndexMappings: IndexMapping; diff --git a/src/plugins/data_view_editor/public/components/data_view_editor_flyout_content.tsx b/src/plugins/data_view_editor/public/components/data_view_editor_flyout_content.tsx index 3c1305d8e7860a..c9baa374ed1de3 100644 --- a/src/plugins/data_view_editor/public/components/data_view_editor_flyout_content.tsx +++ b/src/plugins/data_view_editor/public/components/data_view_editor_flyout_content.tsx @@ -410,7 +410,12 @@ const IndexPatternEditorFlyoutContentComponent = ({