diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java index f497a6c7063fe..28e6f34cd9e3f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; @@ -369,7 +370,14 @@ public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener listen indexResponse -> { listener.onResponse(indexResponse.getResult() == DocWriteResponse.Result.CREATED); }, - listener::onFailure), + e -> { + if (e instanceof VersionConflictEngineException) { + // the snapshot already exists + listener.onResponse(Boolean.TRUE); + } else { + listener.onFailure(e); + } + }), client::index ); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java index 87c0e4ac824ce..55e8dc163dc4d 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java @@ -6,8 +6,11 @@ package org.elasticsearch.xpack.ml.integration; import org.elasticsearch.Version; +import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; @@ -178,6 +181,59 @@ public void testMigrateConfigs() throws InterruptedException, IOException { assertEquals("df-1", datafeedsHolder.get().get(0).getId()); } + public void testExistingSnapshotDoesNotBlockMigration() throws InterruptedException { + // index a doc with the same Id as the config snapshot + IndexRequestBuilder indexRequest = client().prepareIndex(AnomalyDetectorsIndex.jobStateIndexName(), + ElasticsearchMappings.DOC_TYPE, "ml-config") + .setSource(Collections.singletonMap("a_field", "a_value")) + .setOpType(DocWriteRequest.OpType.CREATE) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + + indexRequest.execute().actionGet(); + + // define the configs + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + mlMetadata.putJob(buildJobBuilder("job-foo").build(), false); + + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addMlConfigIndex(metaData, routingTable); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(metaData.putCustom(MlMetadata.TYPE, mlMetadata.build())) + .routingTable(routingTable.build()) + .build(); + + doAnswer(invocation -> { + ClusterStateUpdateTask listener = (ClusterStateUpdateTask) invocation.getArguments()[1]; + listener.clusterStateProcessed("source", mock(ClusterState.class), mock(ClusterState.class)); + return null; + }).when(clusterService).submitStateUpdateTask(eq("remove-migrated-ml-configs"), any()); + + AtomicReference exceptionHolder = new AtomicReference<>(); + AtomicReference responseHolder = new AtomicReference<>(); + + // do the migration + MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService); + // writing the snapshot should fail because the doc already exists + // in which case the migration should continue + blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener), + responseHolder, exceptionHolder); + + assertNull(exceptionHolder.get()); + assertTrue(responseHolder.get()); + + // check the jobs have been migrated + AtomicReference> jobsHolder = new AtomicReference<>(); + JobConfigProvider jobConfigProvider = new JobConfigProvider(client()); + blockingCall(actionListener -> jobConfigProvider.expandJobs("*", true, true, actionListener), + jobsHolder, exceptionHolder); + + assertNull(exceptionHolder.get()); + assertThat(jobsHolder.get(), hasSize(1)); + assertTrue(jobsHolder.get().get(0).build().getCustomSettings().containsKey(MlConfigMigrator.MIGRATED_FROM_VERSION)); + assertEquals("job-foo", jobsHolder.get().get(0).build().getId()); + } + public void testMigrateConfigs_GivenLargeNumberOfJobsAndDatafeeds() throws InterruptedException { int jobCount = randomIntBetween(150, 201); int datafeedCount = randomIntBetween(150, jobCount);