Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ml] Prevent snapshot failure blocking migration #37493

Merged
merged 1 commit into from
Jan 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
Expand Down Expand Up @@ -369,7 +370,14 @@ public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener<Boolean> listen
indexResponse -> {
listener.onResponse(indexResponse.getResult() == DocWriteResponse.Result.CREATED);
},
listener::onFailure),
e -> {
if (e instanceof VersionConflictEngineException) {
// the snapshot already exists
listener.onResponse(Boolean.TRUE);
} else {
listener.onFailure(e);
}
}),
client::index
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
package org.elasticsearch.xpack.ml.integration;

import org.elasticsearch.Version;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
Expand Down Expand Up @@ -178,6 +181,59 @@ public void testMigrateConfigs() throws InterruptedException, IOException {
assertEquals("df-1", datafeedsHolder.get().get(0).getId());
}

public void testExistingSnapshotDoesNotBlockMigration() throws InterruptedException {
// index a doc with the same Id as the config snapshot
IndexRequestBuilder indexRequest = client().prepareIndex(AnomalyDetectorsIndex.jobStateIndexName(),
ElasticsearchMappings.DOC_TYPE, "ml-config")
.setSource(Collections.singletonMap("a_field", "a_value"))
.setOpType(DocWriteRequest.OpType.CREATE)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

indexRequest.execute().actionGet();

// define the configs
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
mlMetadata.putJob(buildJobBuilder("job-foo").build(), false);

MetaData.Builder metaData = MetaData.builder();
RoutingTable.Builder routingTable = RoutingTable.builder();
addMlConfigIndex(metaData, routingTable);
ClusterState clusterState = ClusterState.builder(new ClusterName("_name"))
.metaData(metaData.putCustom(MlMetadata.TYPE, mlMetadata.build()))
.routingTable(routingTable.build())
.build();

doAnswer(invocation -> {
ClusterStateUpdateTask listener = (ClusterStateUpdateTask) invocation.getArguments()[1];
listener.clusterStateProcessed("source", mock(ClusterState.class), mock(ClusterState.class));
return null;
}).when(clusterService).submitStateUpdateTask(eq("remove-migrated-ml-configs"), any());

AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
AtomicReference<Boolean> responseHolder = new AtomicReference<>();

// do the migration
MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService);
// writing the snapshot should fail because the doc already exists
// in which case the migration should continue
blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener),
responseHolder, exceptionHolder);

assertNull(exceptionHolder.get());
assertTrue(responseHolder.get());

// check the jobs have been migrated
AtomicReference<List<Job.Builder>> jobsHolder = new AtomicReference<>();
JobConfigProvider jobConfigProvider = new JobConfigProvider(client());
blockingCall(actionListener -> jobConfigProvider.expandJobs("*", true, true, actionListener),
jobsHolder, exceptionHolder);

assertNull(exceptionHolder.get());
assertThat(jobsHolder.get(), hasSize(1));
assertTrue(jobsHolder.get().get(0).build().getCustomSettings().containsKey(MlConfigMigrator.MIGRATED_FROM_VERSION));
assertEquals("job-foo", jobsHolder.get().get(0).build().getId());
}

public void testMigrateConfigs_GivenLargeNumberOfJobsAndDatafeeds() throws InterruptedException {
int jobCount = randomIntBetween(150, 201);
int datafeedCount = randomIntBetween(150, jobCount);
Expand Down