diff --git a/src/core/server/saved_objects/migrationsv2/actions/index.test.ts b/src/core/server/saved_objects/migrationsv2/actions/index.test.ts index df74a4e1282e43..159bf6b042f8a7 100644 --- a/src/core/server/saved_objects/migrationsv2/actions/index.test.ts +++ b/src/core/server/saved_objects/migrationsv2/actions/index.test.ts @@ -37,7 +37,7 @@ describe('actions', () => { describe('fetchIndices', () => { it('calls catchRetryableEsClientErrors when the promise rejects', async () => { - const task = Actions.fetchIndices(client, ['my_index']); + const task = Actions.fetchIndices({ client, indicesToFetch: ['my_index'] }); try { await task(); } catch (e) { @@ -49,7 +49,7 @@ describe('actions', () => { describe('setWriteBlock', () => { it('calls catchRetryableEsClientErrors when the promise rejects', async () => { - const task = Actions.setWriteBlock(client, 'my_index'); + const task = Actions.setWriteBlock({ client, index: 'my_index' }); try { await task(); } catch (e) { @@ -58,7 +58,10 @@ describe('actions', () => { expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); }); it('re-throws non retry-able errors', async () => { - const task = Actions.setWriteBlock(clientWithNonRetryableError, 'my_index'); + const task = Actions.setWriteBlock({ + client: clientWithNonRetryableError, + index: 'my_index', + }); await task(); expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(nonRetryableError); }); @@ -66,7 +69,11 @@ describe('actions', () => { describe('cloneIndex', () => { it('calls catchRetryableEsClientErrors when the promise rejects', async () => { - const task = Actions.cloneIndex(client, 'my_source_index', 'my_target_index'); + const task = Actions.cloneIndex({ + client, + source: 'my_source_index', + target: 'my_target_index', + }); try { await task(); } catch (e) { @@ -75,7 +82,10 @@ describe('actions', () => { expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); }); it('re-throws non retry-able errors', async () => { - const task = Actions.setWriteBlock(clientWithNonRetryableError, 'my_index'); + const task = Actions.setWriteBlock({ + client: clientWithNonRetryableError, + index: 'my_index', + }); await task(); expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(nonRetryableError); }); @@ -95,7 +105,7 @@ describe('actions', () => { describe('openPit', () => { it('calls catchRetryableEsClientErrors when the promise rejects', async () => { - const task = Actions.openPit(client, 'my_index'); + const task = Actions.openPit({ client, index: 'my_index' }); try { await task(); } catch (e) { @@ -107,7 +117,12 @@ describe('actions', () => { describe('readWithPit', () => { it('calls catchRetryableEsClientErrors when the promise rejects', async () => { - const task = Actions.readWithPit(client, 'pitId', { match_all: {} }, 10_000); + const task = Actions.readWithPit({ + client, + pitId: 'pitId', + query: { match_all: {} }, + batchSize: 10_000, + }); try { await task(); } catch (e) { @@ -119,7 +134,7 @@ describe('actions', () => { describe('closePit', () => { it('calls catchRetryableEsClientErrors when the promise rejects', async () => { - const task = Actions.closePit(client, 'pitId'); + const task = Actions.closePit({ client, pitId: 'pitId' }); try { await task(); } catch (e) { @@ -131,14 +146,14 @@ describe('actions', () => { describe('reindex', () => { it('calls catchRetryableEsClientErrors when the promise rejects', async () => { - const task = Actions.reindex( + const task = Actions.reindex({ client, - 'my_source_index', - 'my_target_index', - Option.none, - false, - {} - ); + sourceIndex: 'my_source_index', + targetIndex: 'my_target_index', + reindexScript: Option.none, + requireAlias: false, + unusedTypesQuery: {}, + }); try { await task(); } catch (e) { @@ -150,7 +165,7 @@ describe('actions', () => { describe('waitForReindexTask', () => { it('calls catchRetryableEsClientErrors when the promise rejects', async () => { - const task = Actions.waitForReindexTask(client, 'my task id', '60s'); + const task = Actions.waitForReindexTask({ client, taskId: 'my task id', timeout: '60s' }); try { await task(); } catch (e) { @@ -160,7 +175,10 @@ describe('actions', () => { expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); }); it('re-throws non retry-able errors', async () => { - const task = Actions.setWriteBlock(clientWithNonRetryableError, 'my_index'); + const task = Actions.setWriteBlock({ + client: clientWithNonRetryableError, + index: 'my_index', + }); await task(); expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(nonRetryableError); }); @@ -168,7 +186,11 @@ describe('actions', () => { describe('waitForPickupUpdatedMappingsTask', () => { it('calls catchRetryableEsClientErrors when the promise rejects', async () => { - const task = Actions.waitForPickupUpdatedMappingsTask(client, 'my task id', '60s'); + const task = Actions.waitForPickupUpdatedMappingsTask({ + client, + taskId: 'my task id', + timeout: '60s', + }); try { await task(); } catch (e) { @@ -178,7 +200,10 @@ describe('actions', () => { expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); }); it('re-throws non retry-able errors', async () => { - const task = Actions.setWriteBlock(clientWithNonRetryableError, 'my_index'); + const task = Actions.setWriteBlock({ + client: clientWithNonRetryableError, + index: 'my_index', + }); await task(); expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(nonRetryableError); }); @@ -186,7 +211,7 @@ describe('actions', () => { describe('updateAliases', () => { it('calls catchRetryableEsClientErrors when the promise rejects', async () => { - const task = Actions.updateAliases(client, []); + const task = Actions.updateAliases({ client, aliasActions: [] }); try { await task(); } catch (e) { @@ -196,7 +221,10 @@ describe('actions', () => { expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); }); it('re-throws non retry-able errors', async () => { - const task = Actions.setWriteBlock(clientWithNonRetryableError, 'my_index'); + const task = Actions.setWriteBlock({ + client: clientWithNonRetryableError, + index: 'my_index', + }); await task(); expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(nonRetryableError); }); @@ -204,7 +232,11 @@ describe('actions', () => { describe('createIndex', () => { it('calls catchRetryableEsClientErrors when the promise rejects', async () => { - const task = Actions.createIndex(client, 'new_index', { properties: {} }); + const task = Actions.createIndex({ + client, + indexName: 'new_index', + mappings: { properties: {} }, + }); try { await task(); } catch (e) { @@ -214,7 +246,10 @@ describe('actions', () => { expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(retryableError); }); it('re-throws non retry-able errors', async () => { - const task = Actions.setWriteBlock(clientWithNonRetryableError, 'my_index'); + const task = Actions.setWriteBlock({ + client: clientWithNonRetryableError, + index: 'my_index', + }); await task(); expect(catchRetryableEsClientErrors).toHaveBeenCalledWith(nonRetryableError); }); @@ -222,7 +257,11 @@ describe('actions', () => { describe('updateAndPickupMappings', () => { it('calls catchRetryableEsClientErrors when the promise rejects', async () => { - const task = Actions.updateAndPickupMappings(client, 'new_index', { properties: {} }); + const task = Actions.updateAndPickupMappings({ + client, + index: 'new_index', + mappings: { properties: {} }, + }); try { await task(); } catch (e) { @@ -276,7 +315,12 @@ describe('actions', () => { describe('bulkOverwriteTransformedDocuments', () => { it('calls catchRetryableEsClientErrors when the promise rejects', async () => { - const task = Actions.bulkOverwriteTransformedDocuments(client, 'new_index', [], 'wait_for'); + const task = Actions.bulkOverwriteTransformedDocuments({ + client, + index: 'new_index', + transformedDocs: [], + refresh: 'wait_for', + }); try { await task(); } catch (e) { @@ -289,7 +333,7 @@ describe('actions', () => { describe('refreshIndex', () => { it('calls catchRetryableEsClientErrors when the promise rejects', async () => { - const task = Actions.refreshIndex(client, 'target_index'); + const task = Actions.refreshIndex({ client, targetIndex: 'target_index' }); try { await task(); } catch (e) { diff --git a/src/core/server/saved_objects/migrationsv2/actions/index.ts b/src/core/server/saved_objects/migrationsv2/actions/index.ts index c2e0476960c3b3..a6f61a521b7ac6 100644 --- a/src/core/server/saved_objects/migrationsv2/actions/index.ts +++ b/src/core/server/saved_objects/migrationsv2/actions/index.ts @@ -68,14 +68,20 @@ export type FetchIndexResponse = Record< { aliases: Record; mappings: IndexMapping; settings: unknown } >; +/** @internal */ +export interface FetchIndicesParams { + client: ElasticsearchClient; + indicesToFetch: string[]; +} + /** * Fetches information about the given indices including aliases, mappings and * settings. */ -export const fetchIndices = ( - client: ElasticsearchClient, - indicesToFetch: string[] -): TaskEither.TaskEither => +export const fetchIndices = ({ + client, + indicesToFetch, +}: FetchIndicesParams): TaskEither.TaskEither => // @ts-expect-error @elastic/elasticsearch IndexState.alias and IndexState.mappings should be required () => { return client.indices @@ -96,6 +102,12 @@ export interface IndexNotFound { type: 'index_not_found_exception'; index: string; } + +/** @internal */ +export interface SetWriteBlockParams { + client: ElasticsearchClient; + index: string; +} /** * Sets a write block in place for the given index. If the response includes * `acknowledged: true` all in-progress writes have drained and no further @@ -105,10 +117,10 @@ export interface IndexNotFound { * include `shards_acknowledged: true` but once the block is in place, * subsequent calls return `shards_acknowledged: false` */ -export const setWriteBlock = ( - client: ElasticsearchClient, - index: string -): TaskEither.TaskEither< +export const setWriteBlock = ({ + client, + index, +}: SetWriteBlockParams): TaskEither.TaskEither< IndexNotFound | RetryableEsClientError, 'set_write_block_succeeded' > => () => { @@ -145,13 +157,21 @@ export const setWriteBlock = ( ); }; +/** @internal */ +export interface RemoveWriteBlockParams { + client: ElasticsearchClient; + index: string; +} /** * Removes a write block from an index */ -export const removeWriteBlock = ( - client: ElasticsearchClient, - index: string -): TaskEither.TaskEither => () => { +export const removeWriteBlock = ({ + client, + index, +}: RemoveWriteBlockParams): TaskEither.TaskEither< + RetryableEsClientError, + 'remove_write_block_succeeded' +> => () => { return client.indices .putSettings<{ acknowledged: boolean; @@ -182,6 +202,12 @@ export const removeWriteBlock = ( .catch(catchRetryableEsClientErrors); }; +/** @internal */ +export interface WaitForIndexStatusYellowParams { + client: ElasticsearchClient; + index: string; + timeout?: string; +} /** * A yellow index status means the index's primary shard is allocated and the * index is ready for searching/indexing documents, but ES wasn't able to @@ -193,11 +219,11 @@ export const removeWriteBlock = ( * yellow at any point in the future. So ultimately data-redundancy is up to * users to maintain. */ -export const waitForIndexStatusYellow = ( - client: ElasticsearchClient, - index: string, - timeout = DEFAULT_TIMEOUT -): TaskEither.TaskEither => () => { +export const waitForIndexStatusYellow = ({ + client, + index, + timeout = DEFAULT_TIMEOUT, +}: WaitForIndexStatusYellowParams): TaskEither.TaskEither => () => { return client.cluster .health({ index, wait_for_status: 'yellow', timeout }) .then(() => { @@ -208,6 +234,14 @@ export const waitForIndexStatusYellow = ( export type CloneIndexResponse = AcknowledgeResponse; +/** @internal */ +export interface CloneIndexParams { + client: ElasticsearchClient; + source: string; + target: string; + /** only used for testing */ + timeout?: string; +} /** * Makes a clone of the source index into the target. * @@ -218,13 +252,15 @@ export type CloneIndexResponse = AcknowledgeResponse; * - the first call will wait up to 120s for the cluster state and all shards * to be updated. */ -export const cloneIndex = ( - client: ElasticsearchClient, - source: string, - target: string, - /** only used for testing */ - timeout = DEFAULT_TIMEOUT -): TaskEither.TaskEither => { +export const cloneIndex = ({ + client, + source, + target, + timeout = DEFAULT_TIMEOUT, +}: CloneIndexParams): TaskEither.TaskEither< + RetryableEsClientError | IndexNotFound, + CloneIndexResponse +> => { const cloneTask: TaskEither.TaskEither< RetryableEsClientError | IndexNotFound, AcknowledgeResponse @@ -302,7 +338,7 @@ export const cloneIndex = ( } else { // Otherwise, wait until the target index has a 'green' status. return pipe( - waitForIndexStatusYellow(client, target, timeout), + waitForIndexStatusYellow({ client, index: target, timeout }), TaskEither.map((value) => { /** When the index status is 'green' we know that all shards were started */ return { acknowledged: true, shardsAcknowledged: true }; @@ -352,16 +388,22 @@ const catchWaitForTaskCompletionTimeout = ( } }; +/** @internal */ +export interface WaitForTaskParams { + client: ElasticsearchClient; + taskId: string; + timeout: string; +} /** * Blocks for up to 60s or until a task completes. * * TODO: delete completed tasks */ -const waitForTask = ( - client: ElasticsearchClient, - taskId: string, - timeout: string -): TaskEither.TaskEither< +const waitForTask = ({ + client, + taskId, + timeout, +}: WaitForTaskParams): TaskEither.TaskEither< RetryableEsClientError | WaitForTaskCompletionTimeout, WaitForTaskResponse > => () => { @@ -433,16 +475,21 @@ export interface OpenPitResponse { pitId: string; } +/** @internal */ +export interface OpenPitParams { + client: ElasticsearchClient; + index: string; +} // how long ES should keep PIT alive const pitKeepAlive = '10m'; /* * Creates a lightweight view of data when the request has been initiated. * See https://www.elastic.co/guide/en/elasticsearch/reference/current/point-in-time-api.html * */ -export const openPit = ( - client: ElasticsearchClient, - index: string -): TaskEither.TaskEither => () => { +export const openPit = ({ + client, + index, +}: OpenPitParams): TaskEither.TaskEither => () => { return client .openPointInTime({ index, @@ -459,17 +506,28 @@ export interface ReadWithPit { readonly totalHits: number | undefined; } +/** @internal */ + +export interface ReadWithPitParams { + client: ElasticsearchClient; + pitId: string; + query: estypes.QueryContainer; + batchSize: number; + searchAfter?: number[]; + seqNoPrimaryTerm?: boolean; +} + /* * Requests documents from the index using PIT mechanism. * */ -export const readWithPit = ( - client: ElasticsearchClient, - pitId: string, - query: estypes.QueryContainer, - batchSize: number, - searchAfter?: number[], - seqNoPrimaryTerm?: boolean -): TaskEither.TaskEither => () => { +export const readWithPit = ({ + client, + pitId, + query, + batchSize, + searchAfter, + seqNoPrimaryTerm, +}: ReadWithPitParams): TaskEither.TaskEither => () => { return client .search({ seq_no_primary_term: seqNoPrimaryTerm, @@ -516,14 +574,19 @@ export const readWithPit = ( .catch(catchRetryableEsClientErrors); }; +/** @internal */ +export interface ClosePitParams { + client: ElasticsearchClient; + pitId: string; +} /* * Closes PIT. * See https://www.elastic.co/guide/en/elasticsearch/reference/current/point-in-time-api.html * */ -export const closePit = ( - client: ElasticsearchClient, - pitId: string -): TaskEither.TaskEither => () => { +export const closePit = ({ + client, + pitId, +}: ClosePitParams): TaskEither.TaskEither => () => { return client .closePointInTime({ body: { id: pitId }, @@ -537,27 +600,42 @@ export const closePit = ( .catch(catchRetryableEsClientErrors); }; +/** @internal */ +export interface TransformDocsParams { + transformRawDocs: TransformRawDocs; + outdatedDocuments: SavedObjectsRawDoc[]; +} /* * Transform outdated docs * */ -export const transformDocs = ( - transformRawDocs: TransformRawDocs, - outdatedDocuments: SavedObjectsRawDoc[] -): TaskEither.TaskEither => - transformRawDocs(outdatedDocuments); +export const transformDocs = ({ + transformRawDocs, + outdatedDocuments, +}: TransformDocsParams): TaskEither.TaskEither< + DocumentsTransformFailed, + DocumentsTransformSuccess +> => transformRawDocs(outdatedDocuments); /** @internal */ export interface ReindexResponse { taskId: string; } +/** @internal */ +export interface RefreshIndexParams { + client: ElasticsearchClient; + targetIndex: string; +} /** * Wait for Elasticsearch to reindex all the changes. */ -export const refreshIndex = ( - client: ElasticsearchClient, - targetIndex: string -): TaskEither.TaskEither => () => { +export const refreshIndex = ({ + client, + targetIndex, +}: RefreshIndexParams): TaskEither.TaskEither< + RetryableEsClientError, + { refreshed: boolean } +> => () => { return client.indices .refresh({ index: targetIndex, @@ -567,6 +645,19 @@ export const refreshIndex = ( }) .catch(catchRetryableEsClientErrors); }; +/** @internal */ +export interface ReindexParams { + client: ElasticsearchClient; + sourceIndex: string; + targetIndex: string; + reindexScript: Option.Option; + requireAlias: boolean; + /* When reindexing we use a source query to exclude saved objects types which + * are no longer used. These saved objects will still be kept in the outdated + * index for backup purposes, but won't be available in the upgraded index. + */ + unusedTypesQuery: estypes.QueryContainer; +} /** * Reindex documents from the `sourceIndex` into the `targetIndex`. Returns a * task ID which can be tracked for progress. @@ -575,18 +666,14 @@ export const refreshIndex = ( * this in parallel. By using `op_type: 'create', conflicts: 'proceed'` there * will be only one write per reindexed document. */ -export const reindex = ( - client: ElasticsearchClient, - sourceIndex: string, - targetIndex: string, - reindexScript: Option.Option, - requireAlias: boolean, - /* When reindexing we use a source query to exclude saved objects types which - * are no longer used. These saved objects will still be kept in the outdated - * index for backup purposes, but won't be available in the upgraded index. - */ - unusedTypesQuery: estypes.QueryContainer -): TaskEither.TaskEither => () => { +export const reindex = ({ + client, + sourceIndex, + targetIndex, + reindexScript, + requireAlias, + unusedTypesQuery, +}: ReindexParams): TaskEither.TaskEither => () => { return client .reindex({ // Require targetIndex to be an alias. Prevents a new index from being @@ -688,11 +775,18 @@ export const waitForReindexTask = flow( ) ); -export const verifyReindex = ( - client: ElasticsearchClient, - sourceIndex: string, - targetIndex: string -): TaskEither.TaskEither< +/** @internal */ +export interface VerifyReindexParams { + client: ElasticsearchClient; + sourceIndex: string; + targetIndex: string; +} + +export const verifyReindex = ({ + client, + sourceIndex, + targetIndex, +}: VerifyReindexParams): TaskEither.TaskEither< RetryableEsClientError | { type: 'verify_reindex_failed' }, 'verify_reindex_succeeded' > => () => { @@ -762,13 +856,18 @@ export type AliasAction = | { remove: { index: string; alias: string; must_exist: boolean } } | { add: { index: string; alias: string } }; +/** @internal */ +export interface UpdateAliasesParams { + client: ElasticsearchClient; + aliasActions: AliasAction[]; +} /** * Calls the Update index alias API `_alias` with the provided alias actions. */ -export const updateAliases = ( - client: ElasticsearchClient, - aliasActions: AliasAction[] -): TaskEither.TaskEither< +export const updateAliases = ({ + client, + aliasActions, +}: UpdateAliasesParams): TaskEither.TaskEither< IndexNotFound | AliasNotFound | RemoveIndexNotAConcreteIndex | RetryableEsClientError, 'update_aliases_succeeded' > => () => { @@ -836,6 +935,14 @@ function aliasArrayToRecord(aliases: string[]): Record { } return result; } + +/** @internal */ +export interface CreateIndexParams { + client: ElasticsearchClient; + indexName: string; + mappings: IndexMapping; + aliases?: string[]; +} /** * Creates an index with the given mappings * @@ -846,12 +953,12 @@ function aliasArrayToRecord(aliases: string[]): Record { * - the first call will wait up to 120s for the cluster state and all shards * to be updated. */ -export const createIndex = ( - client: ElasticsearchClient, - indexName: string, - mappings: IndexMapping, - aliases: string[] = [] -): TaskEither.TaskEither => { +export const createIndex = ({ + client, + indexName, + mappings, + aliases = [], +}: CreateIndexParams): TaskEither.TaskEither => { const createIndexTask: TaskEither.TaskEither< RetryableEsClientError, AcknowledgeResponse @@ -930,7 +1037,7 @@ export const createIndex = ( } else { // Otherwise, wait until the target index has a 'yellow' status. return pipe( - waitForIndexStatusYellow(client, indexName, DEFAULT_TIMEOUT), + waitForIndexStatusYellow({ client, index: indexName, timeout: DEFAULT_TIMEOUT }), TaskEither.map(() => { /** When the index status is 'yellow' we know that all shards were started */ return 'create_index_succeeded'; @@ -946,15 +1053,24 @@ export interface UpdateAndPickupMappingsResponse { taskId: string; } +/** @internal */ +export interface UpdateAndPickupMappingsParams { + client: ElasticsearchClient; + index: string; + mappings: IndexMapping; +} /** * Updates an index's mappings and runs an pickupUpdatedMappings task so that the mapping * changes are "picked up". Returns a taskId to track progress. */ -export const updateAndPickupMappings = ( - client: ElasticsearchClient, - index: string, - mappings: IndexMapping -): TaskEither.TaskEither => { +export const updateAndPickupMappings = ({ + client, + index, + mappings, +}: UpdateAndPickupMappingsParams): TaskEither.TaskEither< + RetryableEsClientError, + UpdateAndPickupMappingsResponse +> => { const putMappingTask: TaskEither.TaskEither< RetryableEsClientError, 'update_mappings_succeeded' @@ -1053,16 +1169,26 @@ export const searchForOutdatedDocuments = ( .catch(catchRetryableEsClientErrors); }; +/** @internal */ +export interface BulkOverwriteTransformedDocumentsParams { + client: ElasticsearchClient; + index: string; + transformedDocs: SavedObjectsRawDoc[]; + refresh: estypes.Refresh; +} /** * Write the up-to-date transformed documents to the index, overwriting any * documents that are still on their outdated version. */ -export const bulkOverwriteTransformedDocuments = ( - client: ElasticsearchClient, - index: string, - transformedDocs: SavedObjectsRawDoc[], - refresh: estypes.Refresh -): TaskEither.TaskEither => () => { +export const bulkOverwriteTransformedDocuments = ({ + client, + index, + transformedDocs, + refresh, +}: BulkOverwriteTransformedDocumentsParams): TaskEither.TaskEither< + RetryableEsClientError, + 'bulk_index_succeeded' +> => () => { return client .bulk({ // Because we only add aliases in the MARK_VERSION_INDEX_READY step we diff --git a/src/core/server/saved_objects/migrationsv2/integration_tests/actions.test.ts b/src/core/server/saved_objects/migrationsv2/integration_tests/actions.test.ts index d0158a4c68f246..b348eb69c4c16c 100644 --- a/src/core/server/saved_objects/migrationsv2/integration_tests/actions.test.ts +++ b/src/core/server/saved_objects/migrationsv2/integration_tests/actions.test.ts @@ -67,9 +67,13 @@ describe('migration actions', () => { client = start.elasticsearch.client.asInternalUser; // Create test fixture data: - await createIndex(client, 'existing_index_with_docs', { - dynamic: true, - properties: {}, + await createIndex({ + client, + indexName: 'existing_index_with_docs', + mappings: { + dynamic: true, + properties: {}, + }, })(); const sourceDocs = ([ { _source: { title: 'doc 1' } }, @@ -78,25 +82,30 @@ describe('migration actions', () => { { _source: { title: 'saved object 4', type: 'another_unused_type' } }, { _source: { title: 'f-agent-event 5', type: 'f_agent_event' } }, ] as unknown) as SavedObjectsRawDoc[]; - await bulkOverwriteTransformedDocuments( + await bulkOverwriteTransformedDocuments({ + client, + index: 'existing_index_with_docs', + transformedDocs: sourceDocs, + refresh: 'wait_for', + })(); + + await createIndex({ client, indexName: 'existing_index_2', mappings: { properties: {} } })(); + await createIndex({ client, - 'existing_index_with_docs', - sourceDocs, - 'wait_for' - )(); - - await createIndex(client, 'existing_index_2', { properties: {} })(); - await createIndex(client, 'existing_index_with_write_block', { properties: {} })(); - await bulkOverwriteTransformedDocuments( + indexName: 'existing_index_with_write_block', + mappings: { properties: {} }, + })(); + await bulkOverwriteTransformedDocuments({ client, - 'existing_index_with_write_block', - sourceDocs, - 'wait_for' - )(); - await setWriteBlock(client, 'existing_index_with_write_block')(); - await updateAliases(client, [ - { add: { index: 'existing_index_2', alias: 'existing_index_2_alias' } }, - ])(); + index: 'existing_index_with_write_block', + transformedDocs: sourceDocs, + refresh: 'wait_for', + })(); + await setWriteBlock({ client, index: 'existing_index_with_write_block' })(); + await updateAliases({ + client, + aliasActions: [{ add: { index: 'existing_index_2', alias: 'existing_index_2_alias' } }], + })(); }); afterAll(async () => { @@ -107,7 +116,7 @@ describe('migration actions', () => { describe('fetchIndices', () => { it('resolves right empty record if no indices were found', async () => { expect.assertions(1); - const task = fetchIndices(client, ['no_such_index']); + const task = fetchIndices({ client, indicesToFetch: ['no_such_index'] }); await expect(task()).resolves.toMatchInlineSnapshot(` Object { "_tag": "Right", @@ -117,10 +126,10 @@ describe('migration actions', () => { }); it('resolves right record with found indices', async () => { expect.assertions(1); - const res = (await fetchIndices(client, [ - 'no_such_index', - 'existing_index_with_docs', - ])()) as Either.Right; + const res = (await fetchIndices({ + client, + indicesToFetch: ['no_such_index', 'existing_index_with_docs'], + })()) as Either.Right; expect(res.right).toEqual( expect.objectContaining({ @@ -136,11 +145,15 @@ describe('migration actions', () => { describe('setWriteBlock', () => { beforeAll(async () => { - await createIndex(client, 'new_index_without_write_block', { properties: {} })(); + await createIndex({ + client, + indexName: 'new_index_without_write_block', + mappings: { properties: {} }, + })(); }); it('resolves right when setting the write block succeeds', async () => { expect.assertions(1); - const task = setWriteBlock(client, 'new_index_without_write_block'); + const task = setWriteBlock({ client, index: 'new_index_without_write_block' }); await expect(task()).resolves.toMatchInlineSnapshot(` Object { "_tag": "Right", @@ -150,7 +163,7 @@ describe('migration actions', () => { }); it('resolves right when setting a write block on an index that already has one', async () => { expect.assertions(1); - const task = setWriteBlock(client, 'existing_index_with_write_block'); + const task = setWriteBlock({ client, index: 'existing_index_with_write_block' }); await expect(task()).resolves.toMatchInlineSnapshot(` Object { "_tag": "Right", @@ -160,7 +173,7 @@ describe('migration actions', () => { }); it('once resolved, prevents further writes to the index', async () => { expect.assertions(1); - const task = setWriteBlock(client, 'new_index_without_write_block'); + const task = setWriteBlock({ client, index: 'new_index_without_write_block' }); await task(); const sourceDocs = ([ { _source: { title: 'doc 1' } }, @@ -169,17 +182,17 @@ describe('migration actions', () => { { _source: { title: 'doc 4' } }, ] as unknown) as SavedObjectsRawDoc[]; await expect( - bulkOverwriteTransformedDocuments( + bulkOverwriteTransformedDocuments({ client, - 'new_index_without_write_block', - sourceDocs, - 'wait_for' - )() + index: 'new_index_without_write_block', + transformedDocs: sourceDocs, + refresh: 'wait_for', + })() ).rejects.toMatchObject(expect.anything()); }); it('resolves left index_not_found_exception when the index does not exist', async () => { expect.assertions(1); - const task = setWriteBlock(client, 'no_such_index'); + const task = setWriteBlock({ client, index: 'no_such_index' }); await expect(task()).resolves.toMatchInlineSnapshot(` Object { "_tag": "Left", @@ -194,13 +207,21 @@ describe('migration actions', () => { describe('removeWriteBlock', () => { beforeAll(async () => { - await createIndex(client, 'existing_index_without_write_block_2', { properties: {} })(); - await createIndex(client, 'existing_index_with_write_block_2', { properties: {} })(); - await setWriteBlock(client, 'existing_index_with_write_block_2')(); + await createIndex({ + client, + indexName: 'existing_index_without_write_block_2', + mappings: { properties: {} }, + })(); + await createIndex({ + client, + indexName: 'existing_index_with_write_block_2', + mappings: { properties: {} }, + })(); + await setWriteBlock({ client, index: 'existing_index_with_write_block_2' })(); }); it('resolves right if successful when an index already has a write block', async () => { expect.assertions(1); - const task = removeWriteBlock(client, 'existing_index_with_write_block_2'); + const task = removeWriteBlock({ client, index: 'existing_index_with_write_block_2' }); await expect(task()).resolves.toMatchInlineSnapshot(` Object { "_tag": "Right", @@ -210,7 +231,7 @@ describe('migration actions', () => { }); it('resolves right if successful when an index does not have a write block', async () => { expect.assertions(1); - const task = removeWriteBlock(client, 'existing_index_without_write_block_2'); + const task = removeWriteBlock({ client, index: 'existing_index_without_write_block_2' }); await expect(task()).resolves.toMatchInlineSnapshot(` Object { "_tag": "Right", @@ -220,7 +241,7 @@ describe('migration actions', () => { }); it('rejects if there is a non-retryable error', async () => { expect.assertions(1); - const task = removeWriteBlock(client, 'no_such_index'); + const task = removeWriteBlock({ client, index: 'no_such_index' }); await expect(task()).rejects.toMatchInlineSnapshot( `[ResponseError: index_not_found_exception]` ); @@ -251,7 +272,10 @@ describe('migration actions', () => { ); // Start tracking the index status - const indexStatusPromise = waitForIndexStatusYellow(client, 'red_then_yellow_index')(); + const indexStatusPromise = waitForIndexStatusYellow({ + client, + index: 'red_then_yellow_index', + })(); const redStatusResponse = await client.cluster.health({ index: 'red_then_yellow_index' }); expect(redStatusResponse.body.status).toBe('red'); @@ -281,7 +305,11 @@ describe('migration actions', () => { } }); it('resolves right if cloning into a new target index', async () => { - const task = cloneIndex(client, 'existing_index_with_write_block', 'clone_target_1'); + const task = cloneIndex({ + client, + source: 'existing_index_with_write_block', + target: 'clone_target_1', + }); expect.assertions(1); await expect(task()).resolves.toMatchInlineSnapshot(` Object { @@ -314,11 +342,11 @@ describe('migration actions', () => { .catch((e) => {}); // Call clone even though the index already exists - const cloneIndexPromise = cloneIndex( + const cloneIndexPromise = cloneIndex({ client, - 'existing_index_with_write_block', - 'clone_red_then_yellow_index' - )(); + source: 'existing_index_with_write_block', + target: 'clone_red_then_yellow_index', + })(); let indexYellow = false; setTimeout(() => { @@ -348,7 +376,7 @@ describe('migration actions', () => { }); it('resolves left index_not_found_exception if the source index does not exist', async () => { expect.assertions(1); - const task = cloneIndex(client, 'no_such_index', 'clone_target_3'); + const task = cloneIndex({ client, source: 'no_such_index', target: 'clone_target_3' }); await expect(task()).resolves.toMatchInlineSnapshot(` Object { "_tag": "Left", @@ -378,12 +406,12 @@ describe('migration actions', () => { .catch((e) => {}); // Call clone even though the index already exists - const cloneIndexPromise = cloneIndex( + const cloneIndexPromise = cloneIndex({ client, - 'existing_index_with_write_block', - 'clone_red_index', - '0s' - )(); + source: 'existing_index_with_write_block', + target: 'clone_red_index', + timeout: '0s', + })(); await cloneIndexPromise.then((res) => { expect(res).toMatchInlineSnapshot(` @@ -404,15 +432,15 @@ describe('migration actions', () => { // together with waitForReindexTask describe('reindex & waitForReindexTask', () => { it('resolves right when reindex succeeds without reindex script', async () => { - const res = (await reindex( + const res = (await reindex({ client, - 'existing_index_with_docs', - 'reindex_target', - Option.none, - false, - { match_all: {} } - )()) as Either.Right; - const task = waitForReindexTask(client, res.right.taskId, '10s'); + sourceIndex: 'existing_index_with_docs', + targetIndex: 'reindex_target', + reindexScript: Option.none, + requireAlias: false, + unusedTypesQuery: { match_all: {} }, + })()) as Either.Right; + const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' }); await expect(task()).resolves.toMatchInlineSnapshot(` Object { "_tag": "Right", @@ -436,21 +464,21 @@ describe('migration actions', () => { `); }); it('resolves right and excludes all documents not matching the unusedTypesQuery', async () => { - const res = (await reindex( + const res = (await reindex({ client, - 'existing_index_with_docs', - 'reindex_target_excluded_docs', - Option.none, - false, - { + sourceIndex: 'existing_index_with_docs', + targetIndex: 'reindex_target_excluded_docs', + reindexScript: Option.none, + requireAlias: false, + unusedTypesQuery: { bool: { must_not: ['f_agent_event', 'another_unused_type'].map((type) => ({ term: { type }, })), }, - } - )()) as Either.Right; - const task = waitForReindexTask(client, res.right.taskId, '10s'); + }, + })()) as Either.Right; + const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' }); await expect(task()).resolves.toMatchInlineSnapshot(` Object { "_tag": "Right", @@ -473,15 +501,15 @@ describe('migration actions', () => { }); it('resolves right when reindex succeeds with reindex script', async () => { expect.assertions(2); - const res = (await reindex( + const res = (await reindex({ client, - 'existing_index_with_docs', - 'reindex_target_2', - Option.some(`ctx._source.title = ctx._source.title + '_updated'`), - false, - { match_all: {} } - )()) as Either.Right; - const task = waitForReindexTask(client, res.right.taskId, '10s'); + sourceIndex: 'existing_index_with_docs', + targetIndex: 'reindex_target_2', + reindexScript: Option.some(`ctx._source.title = ctx._source.title + '_updated'`), + requireAlias: false, + unusedTypesQuery: { match_all: {} }, + })()) as Either.Right; + const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' }); await expect(task()).resolves.toMatchInlineSnapshot(` Object { "_tag": "Right", @@ -506,15 +534,15 @@ describe('migration actions', () => { it('resolves right, ignores version conflicts and does not update existing docs when reindex multiple times', async () => { expect.assertions(3); // Reindex with a script - let res = (await reindex( + let res = (await reindex({ client, - 'existing_index_with_docs', - 'reindex_target_3', - Option.some(`ctx._source.title = ctx._source.title + '_updated'`), - false, - { match_all: {} } - )()) as Either.Right; - let task = waitForReindexTask(client, res.right.taskId, '10s'); + sourceIndex: 'existing_index_with_docs', + targetIndex: 'reindex_target_3', + reindexScript: Option.some(`ctx._source.title = ctx._source.title + '_updated'`), + requireAlias: false, + unusedTypesQuery: { match_all: {} }, + })()) as Either.Right; + let task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' }); await expect(task()).resolves.toMatchInlineSnapshot(` Object { "_tag": "Right", @@ -523,15 +551,15 @@ describe('migration actions', () => { `); // reindex without a script - res = (await reindex( + res = (await reindex({ client, - 'existing_index_with_docs', - 'reindex_target_3', - Option.none, - false, - { match_all: {} } - )()) as Either.Right; - task = waitForReindexTask(client, res.right.taskId, '10s'); + sourceIndex: 'existing_index_with_docs', + targetIndex: 'reindex_target_3', + reindexScript: Option.none, + requireAlias: false, + unusedTypesQuery: { match_all: {} }, + })()) as Either.Right; + task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' }); await expect(task()).resolves.toMatchInlineSnapshot(` Object { "_tag": "Right", @@ -559,7 +587,7 @@ describe('migration actions', () => { expect.assertions(2); // Simulate a reindex that only adds some of the documents from the // source index into the target index - await createIndex(client, 'reindex_target_4', { properties: {} })(); + await createIndex({ client, indexName: 'reindex_target_4', mappings: { properties: {} } })(); const sourceDocs = ((await searchForOutdatedDocuments(client, { batchSize: 1000, targetIndex: 'existing_index_with_docs', @@ -570,18 +598,23 @@ describe('migration actions', () => { _id, _source, })); - await bulkOverwriteTransformedDocuments(client, 'reindex_target_4', sourceDocs, 'wait_for')(); + await bulkOverwriteTransformedDocuments({ + client, + index: 'reindex_target_4', + transformedDocs: sourceDocs, + refresh: 'wait_for', + })(); // Now do a real reindex - const res = (await reindex( + const res = (await reindex({ client, - 'existing_index_with_docs', - 'reindex_target_4', - Option.some(`ctx._source.title = ctx._source.title + '_updated'`), - false, - { match_all: {} } - )()) as Either.Right; - const task = waitForReindexTask(client, res.right.taskId, '10s'); + sourceIndex: 'existing_index_with_docs', + targetIndex: 'reindex_target_4', + reindexScript: Option.some(`ctx._source.title = ctx._source.title + '_updated'`), + requireAlias: false, + unusedTypesQuery: { match_all: {} }, + })()) as Either.Right; + const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' }); await expect(task()).resolves.toMatchInlineSnapshot(` Object { "_tag": "Right", @@ -614,24 +647,28 @@ describe('migration actions', () => { // and should ignore this error. // Create an index with incompatible mappings - await createIndex(client, 'reindex_target_5', { - dynamic: 'strict', - properties: { - /** no title field */ + await createIndex({ + client, + indexName: 'reindex_target_5', + mappings: { + dynamic: 'strict', + properties: { + /** no title field */ + }, }, })(); const { right: { taskId: reindexTaskId }, - } = (await reindex( + } = (await reindex({ client, - 'existing_index_with_docs', - 'reindex_target_5', - Option.none, - false, - { match_all: {} } - )()) as Either.Right; - const task = waitForReindexTask(client, reindexTaskId, '10s'); + sourceIndex: 'existing_index_with_docs', + targetIndex: 'reindex_target_5', + reindexScript: Option.none, + requireAlias: false, + unusedTypesQuery: { match_all: {} }, + })()) as Either.Right; + const task = waitForReindexTask({ client, taskId: reindexTaskId, timeout: '10s' }); await expect(task()).resolves.toMatchInlineSnapshot(` Object { @@ -651,22 +688,26 @@ describe('migration actions', () => { // and should ignore this error. // Create an index with incompatible mappings - await createIndex(client, 'reindex_target_6', { - dynamic: false, - properties: { title: { type: 'integer' } }, // integer is incompatible with string title + await createIndex({ + client, + indexName: 'reindex_target_6', + mappings: { + dynamic: false, + properties: { title: { type: 'integer' } }, // integer is incompatible with string title + }, })(); const { right: { taskId: reindexTaskId }, - } = (await reindex( + } = (await reindex({ client, - 'existing_index_with_docs', - 'reindex_target_6', - Option.none, - false, - { match_all: {} } - )()) as Either.Right; - const task = waitForReindexTask(client, reindexTaskId, '10s'); + sourceIndex: 'existing_index_with_docs', + targetIndex: 'reindex_target_6', + reindexScript: Option.none, + requireAlias: false, + unusedTypesQuery: { match_all: {} }, + })()) as Either.Right; + const task = waitForReindexTask({ client, taskId: reindexTaskId, timeout: '10s' }); await expect(task()).resolves.toMatchInlineSnapshot(` Object { @@ -679,10 +720,17 @@ describe('migration actions', () => { }); it('resolves left index_not_found_exception if source index does not exist', async () => { expect.assertions(1); - const res = (await reindex(client, 'no_such_index', 'reindex_target', Option.none, false, { - match_all: {}, + const res = (await reindex({ + client, + sourceIndex: 'no_such_index', + targetIndex: 'reindex_target', + reindexScript: Option.none, + requireAlias: false, + unusedTypesQuery: { + match_all: {}, + }, })()) as Either.Right; - const task = waitForReindexTask(client, res.right.taskId, '10s'); + const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' }); await expect(task()).resolves.toMatchInlineSnapshot(` Object { "_tag": "Left", @@ -695,16 +743,16 @@ describe('migration actions', () => { }); it('resolves left target_index_had_write_block if all failures are due to a write block', async () => { expect.assertions(1); - const res = (await reindex( + const res = (await reindex({ client, - 'existing_index_with_docs', - 'existing_index_with_write_block', - Option.none, - false, - { match_all: {} } - )()) as Either.Right; + sourceIndex: 'existing_index_with_docs', + targetIndex: 'existing_index_with_write_block', + reindexScript: Option.none, + requireAlias: false, + unusedTypesQuery: { match_all: {} }, + })()) as Either.Right; - const task = waitForReindexTask(client, res.right.taskId, '10s'); + const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' }); await expect(task()).resolves.toMatchInlineSnapshot(` Object { @@ -717,16 +765,16 @@ describe('migration actions', () => { }); it('resolves left if requireAlias=true and the target is not an alias', async () => { expect.assertions(1); - const res = (await reindex( + const res = (await reindex({ client, - 'existing_index_with_docs', - 'existing_index_with_write_block', - Option.none, - true, - { match_all: {} } - )()) as Either.Right; + sourceIndex: 'existing_index_with_docs', + targetIndex: 'existing_index_with_write_block', + reindexScript: Option.none, + requireAlias: true, + unusedTypesQuery: { match_all: {} }, + })()) as Either.Right; - const task = waitForReindexTask(client, res.right.taskId, '10s'); + const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' }); await expect(task()).resolves.toMatchInlineSnapshot(` Object { @@ -739,16 +787,16 @@ describe('migration actions', () => { `); }); it('resolves left wait_for_task_completion_timeout when the task does not finish within the timeout', async () => { - const res = (await reindex( + const res = (await reindex({ client, - 'existing_index_with_docs', - 'reindex_target', - Option.none, - false, - { match_all: {} } - )()) as Either.Right; + sourceIndex: 'existing_index_with_docs', + targetIndex: 'reindex_target', + reindexScript: Option.none, + requireAlias: false, + unusedTypesQuery: { match_all: {} }, + })()) as Either.Right; - const task = waitForReindexTask(client, res.right.taskId, '0s'); + const task = waitForReindexTask({ client, taskId: res.right.taskId, timeout: '0s' }); await expect(task()).resolves.toMatchObject({ _tag: 'Left', @@ -766,17 +814,21 @@ describe('migration actions', () => { describe('verifyReindex', () => { it('resolves right if source and target indices have the same amount of documents', async () => { expect.assertions(1); - const res = (await reindex( + const res = (await reindex({ client, - 'existing_index_with_docs', - 'reindex_target_7', - Option.none, - false, - { match_all: {} } - )()) as Either.Right; - await waitForReindexTask(client, res.right.taskId, '10s')(); - - const task = verifyReindex(client, 'existing_index_with_docs', 'reindex_target_7'); + sourceIndex: 'existing_index_with_docs', + targetIndex: 'reindex_target_7', + reindexScript: Option.none, + requireAlias: false, + unusedTypesQuery: { match_all: {} }, + })()) as Either.Right; + await waitForReindexTask({ client, taskId: res.right.taskId, timeout: '10s' })(); + + const task = verifyReindex({ + client, + sourceIndex: 'existing_index_with_docs', + targetIndex: 'reindex_target_7', + }); await expect(task()).resolves.toMatchInlineSnapshot(` Object { "_tag": "Right", @@ -786,7 +838,11 @@ describe('migration actions', () => { }); it('resolves left if source and target indices have different amount of documents', async () => { expect.assertions(1); - const task = verifyReindex(client, 'existing_index_with_docs', 'existing_index_2'); + const task = verifyReindex({ + client, + sourceIndex: 'existing_index_with_docs', + targetIndex: 'existing_index_2', + }); await expect(task()).resolves.toMatchInlineSnapshot(` Object { "_tag": "Left", @@ -798,19 +854,27 @@ describe('migration actions', () => { }); it('rejects if source or target index does not exist', async () => { expect.assertions(2); - let task = verifyReindex(client, 'no_such_index', 'existing_index_2'); + let task = verifyReindex({ + client, + sourceIndex: 'no_such_index', + targetIndex: 'existing_index_2', + }); await expect(task()).rejects.toMatchInlineSnapshot( `[ResponseError: index_not_found_exception]` ); - task = verifyReindex(client, 'existing_index_2', 'no_such_index'); + task = verifyReindex({ + client, + sourceIndex: 'existing_index_2', + targetIndex: 'no_such_index', + }); await expect(task()).rejects.toThrow('index_not_found_exception'); }); }); describe('openPit', () => { it('opens PointInTime for an index', async () => { - const openPitTask = openPit(client, 'existing_index_with_docs'); + const openPitTask = openPit({ client, index: 'existing_index_with_docs' }); const pitResponse = (await openPitTask()) as Either.Right; expect(pitResponse.right.pitId).toEqual(expect.any(String)); @@ -824,52 +888,52 @@ describe('migration actions', () => { await expect(searchResponse.body.hits.hits.length).toBeGreaterThan(0); }); it('rejects if index does not exist', async () => { - const openPitTask = openPit(client, 'no_such_index'); + const openPitTask = openPit({ client, index: 'no_such_index' }); await expect(openPitTask()).rejects.toThrow('index_not_found_exception'); }); }); describe('readWithPit', () => { it('requests documents from an index using given PIT', async () => { - const openPitTask = openPit(client, 'existing_index_with_docs'); + const openPitTask = openPit({ client, index: 'existing_index_with_docs' }); const pitResponse = (await openPitTask()) as Either.Right; - const readWithPitTask = readWithPit( + const readWithPitTask = readWithPit({ client, - pitResponse.right.pitId, - { match_all: {} }, - 1000, - undefined - ); + pitId: pitResponse.right.pitId, + query: { match_all: {} }, + batchSize: 1000, + searchAfter: undefined, + }); const docsResponse = (await readWithPitTask()) as Either.Right; await expect(docsResponse.right.outdatedDocuments.length).toBe(5); }); it('requests the batchSize of documents from an index', async () => { - const openPitTask = openPit(client, 'existing_index_with_docs'); + const openPitTask = openPit({ client, index: 'existing_index_with_docs' }); const pitResponse = (await openPitTask()) as Either.Right; - const readWithPitTask = readWithPit( + const readWithPitTask = readWithPit({ client, - pitResponse.right.pitId, - { match_all: {} }, - 3, - undefined - ); + pitId: pitResponse.right.pitId, + query: { match_all: {} }, + batchSize: 3, + searchAfter: undefined, + }); const docsResponse = (await readWithPitTask()) as Either.Right; await expect(docsResponse.right.outdatedDocuments.length).toBe(3); }); it('it excludes documents not matching the provided "query"', async () => { - const openPitTask = openPit(client, 'existing_index_with_docs'); + const openPitTask = openPit({ client, index: 'existing_index_with_docs' }); const pitResponse = (await openPitTask()) as Either.Right; - const readWithPitTask = readWithPit( + const readWithPitTask = readWithPit({ client, - pitResponse.right.pitId, - { + pitId: pitResponse.right.pitId, + query: { bool: { must_not: [ { @@ -885,9 +949,9 @@ describe('migration actions', () => { ], }, }, - 1000, - undefined - ); + batchSize: 1000, + searchAfter: undefined, + }); const docsResponse = (await readWithPitTask()) as Either.Right; @@ -902,18 +966,18 @@ describe('migration actions', () => { }); it('only returns documents that match the provided "query"', async () => { - const openPitTask = openPit(client, 'existing_index_with_docs'); + const openPitTask = openPit({ client, index: 'existing_index_with_docs' }); const pitResponse = (await openPitTask()) as Either.Right; - const readWithPitTask = readWithPit( + const readWithPitTask = readWithPit({ client, - pitResponse.right.pitId, - { + pitId: pitResponse.right.pitId, + query: { match: { title: { query: 'doc' } }, }, - 1000, - undefined - ); + batchSize: 1000, + searchAfter: undefined, + }); const docsResponse = (await readWithPitTask()) as Either.Right; @@ -928,19 +992,19 @@ describe('migration actions', () => { }); it('returns docs with _seq_no and _primary_term when specified', async () => { - const openPitTask = openPit(client, 'existing_index_with_docs'); + const openPitTask = openPit({ client, index: 'existing_index_with_docs' }); const pitResponse = (await openPitTask()) as Either.Right; - const readWithPitTask = readWithPit( + const readWithPitTask = readWithPit({ client, - pitResponse.right.pitId, - { + pitId: pitResponse.right.pitId, + query: { match: { title: { query: 'doc' } }, }, - 1000, - undefined, - true - ); + batchSize: 1000, + searchAfter: undefined, + seqNoPrimaryTerm: true, + }); const docsResponse = (await readWithPitTask()) as Either.Right; @@ -955,18 +1019,18 @@ describe('migration actions', () => { }); it('does not return docs with _seq_no and _primary_term if not specified', async () => { - const openPitTask = openPit(client, 'existing_index_with_docs'); + const openPitTask = openPit({ client, index: 'existing_index_with_docs' }); const pitResponse = (await openPitTask()) as Either.Right; - const readWithPitTask = readWithPit( + const readWithPitTask = readWithPit({ client, - pitResponse.right.pitId, - { + pitId: pitResponse.right.pitId, + query: { match: { title: { query: 'doc' } }, }, - 1000, - undefined - ); + batchSize: 1000, + searchAfter: undefined, + }); const docsResponse = (await readWithPitTask()) as Either.Right; @@ -981,24 +1045,24 @@ describe('migration actions', () => { }); it('rejects if PIT does not exist', async () => { - const readWithPitTask = readWithPit( + const readWithPitTask = readWithPit({ client, - 'no_such_pit', - { match_all: {} }, - 1000, - undefined - ); + pitId: 'no_such_pit', + query: { match_all: {} }, + batchSize: 1000, + searchAfter: undefined, + }); await expect(readWithPitTask()).rejects.toThrow('illegal_argument_exception'); }); }); describe('closePit', () => { it('closes PointInTime', async () => { - const openPitTask = openPit(client, 'existing_index_with_docs'); + const openPitTask = openPit({ client, index: 'existing_index_with_docs' }); const pitResponse = (await openPitTask()) as Either.Right; const pitId = pitResponse.right.pitId; - await closePit(client, pitId)(); + await closePit({ client, pitId })(); const searchTask = client.search({ body: { @@ -1010,7 +1074,7 @@ describe('migration actions', () => { }); it('rejects if PIT does not exist', async () => { - const closePitTask = closePit(client, 'no_such_pit'); + const closePitTask = closePit({ client, pitId: 'no_such_pit' }); await expect(closePitTask()).rejects.toThrow('illegal_argument_exception'); }); }); @@ -1034,7 +1098,10 @@ describe('migration actions', () => { return Either.right({ processedDocs }); }; } - const transformTask = transformDocs(innerTransformRawDocs, originalDocs); + const transformTask = transformDocs({ + transformRawDocs: innerTransformRawDocs, + outdatedDocuments: originalDocs, + }); const resultsWithProcessDocs = ((await transformTask()) as Either.Right) .right.processedDocs; @@ -1051,7 +1118,11 @@ describe('migration actions', () => { 'existing_index_with_write_block' )()) as Either.Right; - const task = waitForPickupUpdatedMappingsTask(client, res.right.taskId, '10s'); + const task = waitForPickupUpdatedMappingsTask({ + client, + taskId: res.right.taskId, + timeout: '10s', + }); // We can't do a snapshot match because the response includes an index // id which ES assigns dynamically @@ -1065,7 +1136,11 @@ describe('migration actions', () => { 'no_such_index' )()) as Either.Right; - const task = waitForPickupUpdatedMappingsTask(client, res.right.taskId, '10s'); + const task = waitForPickupUpdatedMappingsTask({ + client, + taskId: res.right.taskId, + timeout: '10s', + }); await expect(task()).rejects.toMatchInlineSnapshot(` [Error: pickupUpdatedMappings task failed with the following error: @@ -1078,7 +1153,11 @@ describe('migration actions', () => { 'existing_index_with_docs' )()) as Either.Right; - const task = waitForPickupUpdatedMappingsTask(client, res.right.taskId, '0s'); + const task = waitForPickupUpdatedMappingsTask({ + client, + taskId: res.right.taskId, + timeout: '0s', + }); await expect(task()).resolves.toMatchObject({ _tag: 'Left', @@ -1097,7 +1176,11 @@ describe('migration actions', () => { 'existing_index_with_docs' )()) as Either.Right; - const task = waitForPickupUpdatedMappingsTask(client, res.right.taskId, '10s'); + const task = waitForPickupUpdatedMappingsTask({ + client, + taskId: res.right.taskId, + timeout: '10s', + }); await expect(task()).resolves.toMatchInlineSnapshot(` Object { @@ -1111,9 +1194,13 @@ describe('migration actions', () => { describe('updateAndPickupMappings', () => { it('resolves right when mappings were updated and picked up', async () => { // Create an index without any mappings and insert documents into it - await createIndex(client, 'existing_index_without_mappings', { - dynamic: false, - properties: {}, + await createIndex({ + client, + indexName: 'existing_index_without_mappings', + mappings: { + dynamic: false, + properties: {}, + }, })(); const sourceDocs = ([ { _source: { title: 'doc 1' } }, @@ -1121,12 +1208,12 @@ describe('migration actions', () => { { _source: { title: 'doc 3' } }, { _source: { title: 'doc 4' } }, ] as unknown) as SavedObjectsRawDoc[]; - await bulkOverwriteTransformedDocuments( + await bulkOverwriteTransformedDocuments({ client, - 'existing_index_without_mappings', - sourceDocs, - 'wait_for' - )(); + index: 'existing_index_without_mappings', + transformedDocs: sourceDocs, + refresh: 'wait_for', + })(); // Assert that we can't search over the unmapped fields of the document const originalSearchResults = ((await searchForOutdatedDocuments(client, { @@ -1139,14 +1226,18 @@ describe('migration actions', () => { expect(originalSearchResults.length).toBe(0); // Update and pickup mappings so that the title field is searchable - const res = await updateAndPickupMappings(client, 'existing_index_without_mappings', { - properties: { - title: { type: 'text' }, + const res = await updateAndPickupMappings({ + client, + index: 'existing_index_without_mappings', + mappings: { + properties: { + title: { type: 'text' }, + }, }, })(); expect(Either.isRight(res)).toBe(true); const taskId = (res as Either.Right).right.taskId; - await waitForPickupUpdatedMappingsTask(client, taskId, '60s')(); + await waitForPickupUpdatedMappingsTask({ client, taskId, timeout: '60s' })(); // Repeat the search expecting to be able to find the existing documents const pickedUpSearchResults = ((await searchForOutdatedDocuments(client, { @@ -1163,15 +1254,18 @@ describe('migration actions', () => { describe('updateAliases', () => { describe('remove', () => { it('resolves left index_not_found_exception when the index does not exist', async () => { - const task = updateAliases(client, [ - { - remove: { - alias: 'no_such_alias', - index: 'no_such_index', - must_exist: false, + const task = updateAliases({ + client, + aliasActions: [ + { + remove: { + alias: 'no_such_alias', + index: 'no_such_index', + must_exist: false, + }, }, - }, - ]); + ], + }); await expect(task()).resolves.toMatchInlineSnapshot(` Object { "_tag": "Left", @@ -1184,15 +1278,18 @@ describe('migration actions', () => { }); describe('with must_exist=false', () => { it('resolves left alias_not_found_exception when alias does not exist', async () => { - const task = updateAliases(client, [ - { - remove: { - alias: 'no_such_alias', - index: 'existing_index_with_docs', - must_exist: false, + const task = updateAliases({ + client, + aliasActions: [ + { + remove: { + alias: 'no_such_alias', + index: 'existing_index_with_docs', + must_exist: false, + }, }, - }, - ]); + ], + }); await expect(task()).resolves.toMatchInlineSnapshot(` Object { "_tag": "Left", @@ -1205,15 +1302,18 @@ describe('migration actions', () => { }); describe('with must_exist=true', () => { it('resolves left alias_not_found_exception when alias does not exist on specified index', async () => { - const task = updateAliases(client, [ - { - remove: { - alias: 'existing_index_2_alias', - index: 'existing_index_with_docs', - must_exist: true, + const task = updateAliases({ + client, + aliasActions: [ + { + remove: { + alias: 'existing_index_2_alias', + index: 'existing_index_with_docs', + must_exist: true, + }, }, - }, - ]); + ], + }); await expect(task()).resolves.toMatchInlineSnapshot(` Object { "_tag": "Left", @@ -1224,15 +1324,18 @@ describe('migration actions', () => { `); }); it('resolves left alias_not_found_exception when alias does not exist', async () => { - const task = updateAliases(client, [ - { - remove: { - alias: 'no_such_alias', - index: 'existing_index_with_docs', - must_exist: true, + const task = updateAliases({ + client, + aliasActions: [ + { + remove: { + alias: 'no_such_alias', + index: 'existing_index_with_docs', + must_exist: true, + }, }, - }, - ]); + ], + }); await expect(task()).resolves.toMatchInlineSnapshot(` Object { "_tag": "Left", @@ -1246,13 +1349,16 @@ describe('migration actions', () => { }); describe('remove_index', () => { it('left index_not_found_exception if index does not exist', async () => { - const task = updateAliases(client, [ - { - remove_index: { - index: 'no_such_index', + const task = updateAliases({ + client, + aliasActions: [ + { + remove_index: { + index: 'no_such_index', + }, }, - }, - ]); + ], + }); await expect(task()).resolves.toMatchInlineSnapshot(` Object { "_tag": "Left", @@ -1264,13 +1370,16 @@ describe('migration actions', () => { `); }); it('left remove_index_not_a_concrete_index when remove_index targets an alias', async () => { - const task = updateAliases(client, [ - { - remove_index: { - index: 'existing_index_2_alias', + const task = updateAliases({ + client, + aliasActions: [ + { + remove_index: { + index: 'existing_index_2_alias', + }, }, - }, - ]); + ], + }); await expect(task()).resolves.toMatchInlineSnapshot(` Object { "_tag": "Left", @@ -1312,7 +1421,11 @@ describe('migration actions', () => { }); // Call createIndex even though the index already exists - const createIndexPromise = createIndex(client, 'red_then_yellow_index', undefined as any)(); + const createIndexPromise = createIndex({ + client, + indexName: 'red_then_yellow_index', + mappings: undefined as any, + })(); let indexYellow = false; setTimeout(() => { @@ -1341,7 +1454,7 @@ describe('migration actions', () => { // Creating an index with the same name as an existing alias to induce // failure await expect( - createIndex(client, 'existing_index_2_alias', undefined as any)() + createIndex({ client, indexName: 'existing_index_2_alias', mappings: undefined as any })() ).rejects.toMatchInlineSnapshot(`[ResponseError: invalid_index_name_exception]`); }); }); @@ -1353,12 +1466,12 @@ describe('migration actions', () => { { _source: { title: 'doc 6' } }, { _source: { title: 'doc 7' } }, ] as unknown) as SavedObjectsRawDoc[]; - const task = bulkOverwriteTransformedDocuments( + const task = bulkOverwriteTransformedDocuments({ client, - 'existing_index_with_docs', - newDocs, - 'wait_for' - ); + index: 'existing_index_with_docs', + transformedDocs: newDocs, + refresh: 'wait_for', + }); await expect(task()).resolves.toMatchInlineSnapshot(` Object { @@ -1374,12 +1487,15 @@ describe('migration actions', () => { outdatedDocumentsQuery: undefined, })()) as Either.Right).right.outdatedDocuments; - const task = bulkOverwriteTransformedDocuments( + const task = bulkOverwriteTransformedDocuments({ client, - 'existing_index_with_docs', - [...existingDocs, ({ _source: { title: 'doc 8' } } as unknown) as SavedObjectsRawDoc], - 'wait_for' - ); + index: 'existing_index_with_docs', + transformedDocs: [ + ...existingDocs, + ({ _source: { title: 'doc 8' } } as unknown) as SavedObjectsRawDoc, + ], + refresh: 'wait_for', + }); await expect(task()).resolves.toMatchInlineSnapshot(` Object { "_tag": "Right", @@ -1394,12 +1510,12 @@ describe('migration actions', () => { { _source: { title: 'doc 7' } }, ] as unknown) as SavedObjectsRawDoc[]; await expect( - bulkOverwriteTransformedDocuments( + bulkOverwriteTransformedDocuments({ client, - 'existing_index_with_write_block', - newDocs, - 'wait_for' - )() + index: 'existing_index_with_write_block', + transformedDocs: newDocs, + refresh: 'wait_for', + })() ).rejects.toMatchObject(expect.anything()); }); }); diff --git a/src/core/server/saved_objects/migrationsv2/migrations_state_machine_cleanup.ts b/src/core/server/saved_objects/migrationsv2/migrations_state_machine_cleanup.ts index 1881f9a712c293..e9cb33c0aa54a2 100644 --- a/src/core/server/saved_objects/migrationsv2/migrations_state_machine_cleanup.ts +++ b/src/core/server/saved_objects/migrationsv2/migrations_state_machine_cleanup.ts @@ -19,7 +19,7 @@ export async function cleanup( if (!state) return; if ('sourceIndexPitId' in state) { try { - await Actions.closePit(client, state.sourceIndexPitId)(); + await Actions.closePit({ client, pitId: state.sourceIndexPitId })(); } catch (e) { executionLog.push({ type: 'cleanup', diff --git a/src/core/server/saved_objects/migrationsv2/next.ts b/src/core/server/saved_objects/migrationsv2/next.ts index 07ebf80271d48c..e43223688d4bcd 100644 --- a/src/core/server/saved_objects/migrationsv2/next.ts +++ b/src/core/server/saved_objects/migrationsv2/next.ts @@ -58,38 +58,46 @@ export type ResponseType = UnwrapPromise< export const nextActionMap = (client: ElasticsearchClient, transformRawDocs: TransformRawDocs) => { return { INIT: (state: InitState) => - Actions.fetchIndices(client, [state.currentAlias, state.versionAlias]), + Actions.fetchIndices({ client, indicesToFetch: [state.currentAlias, state.versionAlias] }), WAIT_FOR_YELLOW_SOURCE: (state: WaitForYellowSourceState) => - Actions.waitForIndexStatusYellow(client, state.sourceIndex.value), + Actions.waitForIndexStatusYellow({ client, index: state.sourceIndex.value }), SET_SOURCE_WRITE_BLOCK: (state: SetSourceWriteBlockState) => - Actions.setWriteBlock(client, state.sourceIndex.value), + Actions.setWriteBlock({ client, index: state.sourceIndex.value }), CREATE_NEW_TARGET: (state: CreateNewTargetState) => - Actions.createIndex(client, state.targetIndex, state.targetIndexMappings), + Actions.createIndex({ + client, + indexName: state.targetIndex, + mappings: state.targetIndexMappings, + }), CREATE_REINDEX_TEMP: (state: CreateReindexTempState) => - Actions.createIndex(client, state.tempIndex, state.tempIndexMappings), + Actions.createIndex({ + client, + indexName: state.tempIndex, + mappings: state.tempIndexMappings, + }), REINDEX_SOURCE_TO_TEMP_OPEN_PIT: (state: ReindexSourceToTempOpenPit) => - Actions.openPit(client, state.sourceIndex.value), + Actions.openPit({ client, index: state.sourceIndex.value }), REINDEX_SOURCE_TO_TEMP_READ: (state: ReindexSourceToTempRead) => - Actions.readWithPit( + Actions.readWithPit({ client, - state.sourceIndexPitId, + pitId: state.sourceIndexPitId, /* When reading we use a source query to exclude saved objects types which * are no longer used. These saved objects will still be kept in the outdated * index for backup purposes, but won't be available in the upgraded index. */ - state.unusedTypesQuery, - state.batchSize, - state.lastHitSortValue - ), + query: state.unusedTypesQuery, + batchSize: state.batchSize, + searchAfter: state.lastHitSortValue, + }), REINDEX_SOURCE_TO_TEMP_CLOSE_PIT: (state: ReindexSourceToTempClosePit) => - Actions.closePit(client, state.sourceIndexPitId), + Actions.closePit({ client, pitId: state.sourceIndexPitId }), REINDEX_SOURCE_TO_TEMP_INDEX: (state: ReindexSourceToTempIndex) => - Actions.transformDocs(transformRawDocs, state.outdatedDocuments), + Actions.transformDocs({ transformRawDocs, outdatedDocuments: state.outdatedDocuments }), REINDEX_SOURCE_TO_TEMP_INDEX_BULK: (state: ReindexSourceToTempIndexBulk) => - Actions.bulkOverwriteTransformedDocuments( + Actions.bulkOverwriteTransformedDocuments({ client, - state.tempIndex, - state.transformedDocs, + index: state.tempIndex, + transformedDocs: state.transformedDocs, /** * Since we don't run a search against the target index, we disable "refresh" to speed up * the migration process. @@ -97,39 +105,48 @@ export const nextActionMap = (client: ElasticsearchClient, transformRawDocs: Tra * before we reach out to the OUTDATED_DOCUMENTS_SEARCH_OPEN_PIT step. * Right now, it's performed during REFRESH_TARGET step. */ - false - ), + refresh: false, + }), SET_TEMP_WRITE_BLOCK: (state: SetTempWriteBlock) => - Actions.setWriteBlock(client, state.tempIndex), + Actions.setWriteBlock({ client, index: state.tempIndex }), CLONE_TEMP_TO_TARGET: (state: CloneTempToSource) => - Actions.cloneIndex(client, state.tempIndex, state.targetIndex), - REFRESH_TARGET: (state: RefreshTarget) => Actions.refreshIndex(client, state.targetIndex), + Actions.cloneIndex({ client, source: state.tempIndex, target: state.targetIndex }), + REFRESH_TARGET: (state: RefreshTarget) => + Actions.refreshIndex({ client, targetIndex: state.targetIndex }), UPDATE_TARGET_MAPPINGS: (state: UpdateTargetMappingsState) => - Actions.updateAndPickupMappings(client, state.targetIndex, state.targetIndexMappings), + Actions.updateAndPickupMappings({ + client, + index: state.targetIndex, + mappings: state.targetIndexMappings, + }), UPDATE_TARGET_MAPPINGS_WAIT_FOR_TASK: (state: UpdateTargetMappingsWaitForTaskState) => - Actions.waitForPickupUpdatedMappingsTask(client, state.updateTargetMappingsTaskId, '60s'), + Actions.waitForPickupUpdatedMappingsTask({ + client, + taskId: state.updateTargetMappingsTaskId, + timeout: '60s', + }), OUTDATED_DOCUMENTS_SEARCH_OPEN_PIT: (state: OutdatedDocumentsSearchOpenPit) => - Actions.openPit(client, state.targetIndex), + Actions.openPit({ client, index: state.targetIndex }), OUTDATED_DOCUMENTS_SEARCH_READ: (state: OutdatedDocumentsSearchRead) => - Actions.readWithPit( + Actions.readWithPit({ client, - state.pitId, + pitId: state.pitId, // search for outdated documents only - state.outdatedDocumentsQuery, - state.batchSize, - state.lastHitSortValue - ), + query: state.outdatedDocumentsQuery, + batchSize: state.batchSize, + searchAfter: state.lastHitSortValue, + }), OUTDATED_DOCUMENTS_SEARCH_CLOSE_PIT: (state: OutdatedDocumentsSearchClosePit) => - Actions.closePit(client, state.pitId), + Actions.closePit({ client, pitId: state.pitId }), OUTDATED_DOCUMENTS_REFRESH: (state: OutdatedDocumentsRefresh) => - Actions.refreshIndex(client, state.targetIndex), + Actions.refreshIndex({ client, targetIndex: state.targetIndex }), OUTDATED_DOCUMENTS_TRANSFORM: (state: OutdatedDocumentsTransform) => - Actions.transformDocs(transformRawDocs, state.outdatedDocuments), + Actions.transformDocs({ transformRawDocs, outdatedDocuments: state.outdatedDocuments }), TRANSFORMED_DOCUMENTS_BULK_INDEX: (state: TransformedDocumentsBulkIndex) => - Actions.bulkOverwriteTransformedDocuments( + Actions.bulkOverwriteTransformedDocuments({ client, - state.targetIndex, - state.transformedDocs, + index: state.targetIndex, + transformedDocs: state.transformedDocs, /** * Since we don't run a search against the target index, we disable "refresh" to speed up * the migration process. @@ -137,29 +154,33 @@ export const nextActionMap = (client: ElasticsearchClient, transformRawDocs: Tra * before we reach out to the MARK_VERSION_INDEX_READY step. * Right now, it's performed during OUTDATED_DOCUMENTS_REFRESH step. */ - false - ), + refresh: false, + }), MARK_VERSION_INDEX_READY: (state: MarkVersionIndexReady) => - Actions.updateAliases(client, state.versionIndexReadyActions.value), + Actions.updateAliases({ client, aliasActions: state.versionIndexReadyActions.value }), MARK_VERSION_INDEX_READY_CONFLICT: (state: MarkVersionIndexReadyConflict) => - Actions.fetchIndices(client, [state.currentAlias, state.versionAlias]), + Actions.fetchIndices({ client, indicesToFetch: [state.currentAlias, state.versionAlias] }), LEGACY_SET_WRITE_BLOCK: (state: LegacySetWriteBlockState) => - Actions.setWriteBlock(client, state.legacyIndex), + Actions.setWriteBlock({ client, index: state.legacyIndex }), LEGACY_CREATE_REINDEX_TARGET: (state: LegacyCreateReindexTargetState) => - Actions.createIndex(client, state.sourceIndex.value, state.legacyReindexTargetMappings), + Actions.createIndex({ + client, + indexName: state.sourceIndex.value, + mappings: state.legacyReindexTargetMappings, + }), LEGACY_REINDEX: (state: LegacyReindexState) => - Actions.reindex( + Actions.reindex({ client, - state.legacyIndex, - state.sourceIndex.value, - state.preMigrationScript, - false, - state.unusedTypesQuery - ), + sourceIndex: state.legacyIndex, + targetIndex: state.sourceIndex.value, + reindexScript: state.preMigrationScript, + requireAlias: false, + unusedTypesQuery: state.unusedTypesQuery, + }), LEGACY_REINDEX_WAIT_FOR_TASK: (state: LegacyReindexWaitForTaskState) => - Actions.waitForReindexTask(client, state.legacyReindexTaskId, '60s'), + Actions.waitForReindexTask({ client, taskId: state.legacyReindexTaskId, timeout: '60s' }), LEGACY_DELETE: (state: LegacyDeleteState) => - Actions.updateAliases(client, state.legacyPreMigrationDoneActions), + Actions.updateAliases({ client, aliasActions: state.legacyPreMigrationDoneActions }), }; };