Skip to content

Commit

Permalink
[Ml] Prevent config snapshot failure blocking migration (#37493)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkyle authored Jan 16, 2019
1 parent 0721448 commit 9a1b2eb
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
Expand Down Expand Up @@ -369,7 +370,14 @@ public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener<Boolean> listen
indexResponse -> {
listener.onResponse(indexResponse.getResult() == DocWriteResponse.Result.CREATED);
},
listener::onFailure),
e -> {
if (e instanceof VersionConflictEngineException) {
// the snapshot already exists
listener.onResponse(Boolean.TRUE);
} else {
listener.onFailure(e);
}
}),
client::index
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
package org.elasticsearch.xpack.ml.integration;

import org.elasticsearch.Version;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
Expand Down Expand Up @@ -178,6 +181,59 @@ public void testMigrateConfigs() throws InterruptedException, IOException {
assertEquals("df-1", datafeedsHolder.get().get(0).getId());
}

public void testExistingSnapshotDoesNotBlockMigration() throws InterruptedException {
// index a doc with the same Id as the config snapshot
IndexRequestBuilder indexRequest = client().prepareIndex(AnomalyDetectorsIndex.jobStateIndexName(),
ElasticsearchMappings.DOC_TYPE, "ml-config")
.setSource(Collections.singletonMap("a_field", "a_value"))
.setOpType(DocWriteRequest.OpType.CREATE)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

indexRequest.execute().actionGet();

// define the configs
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
mlMetadata.putJob(buildJobBuilder("job-foo").build(), false);

MetaData.Builder metaData = MetaData.builder();
RoutingTable.Builder routingTable = RoutingTable.builder();
addMlConfigIndex(metaData, routingTable);
ClusterState clusterState = ClusterState.builder(new ClusterName("_name"))
.metaData(metaData.putCustom(MlMetadata.TYPE, mlMetadata.build()))
.routingTable(routingTable.build())
.build();

doAnswer(invocation -> {
ClusterStateUpdateTask listener = (ClusterStateUpdateTask) invocation.getArguments()[1];
listener.clusterStateProcessed("source", mock(ClusterState.class), mock(ClusterState.class));
return null;
}).when(clusterService).submitStateUpdateTask(eq("remove-migrated-ml-configs"), any());

AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
AtomicReference<Boolean> responseHolder = new AtomicReference<>();

// do the migration
MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService);
// writing the snapshot should fail because the doc already exists
// in which case the migration should continue
blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener),
responseHolder, exceptionHolder);

assertNull(exceptionHolder.get());
assertTrue(responseHolder.get());

// check the jobs have been migrated
AtomicReference<List<Job.Builder>> jobsHolder = new AtomicReference<>();
JobConfigProvider jobConfigProvider = new JobConfigProvider(client());
blockingCall(actionListener -> jobConfigProvider.expandJobs("*", true, true, actionListener),
jobsHolder, exceptionHolder);

assertNull(exceptionHolder.get());
assertThat(jobsHolder.get(), hasSize(1));
assertTrue(jobsHolder.get().get(0).build().getCustomSettings().containsKey(MlConfigMigrator.MIGRATED_FROM_VERSION));
assertEquals("job-foo", jobsHolder.get().get(0).build().getId());
}

public void testMigrateConfigs_GivenLargeNumberOfJobsAndDatafeeds() throws InterruptedException {
int jobCount = randomIntBetween(150, 201);
int datafeedCount = randomIntBetween(150, jobCount);
Expand Down

0 comments on commit 9a1b2eb

Please sign in to comment.