From 439b038a68465066a13772fa7257c0445f4c6a06 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Tue, 11 May 2021 20:21:44 +0100 Subject: [PATCH] [ML] Revert model snapshot now waits for annotations index (#72947) Reverting a model snapshot with the delete_intervening_results option deletes system-generated annotations that are more recent than the model snapshot. Doing this relies on the annotations index being available, so the revert model snapshot action now waits for this. This problem is more likely to be seen in recent releases, as we now revert to the most recent model snapshot when a job relocates from one node to another, so we are more likely to be reverting a model snapshot at a time when there has been cluster disruption and this could also be causing the annotations index to be temporarily unavailable. Backport of #72926 --- .../core/ml/annotations/AnnotationIndex.java | 47 ++++++++++++++----- .../xpack/ml/MlInitializationService.java | 14 +++--- .../TransportRevertModelSnapshotAction.java | 12 ++++- .../autodetect/AutodetectProcessManager.java | 3 +- .../upgrader/SnapshotUpgradeTaskExecutor.java | 3 +- 5 files changed, 58 insertions(+), 21 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationIndex.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationIndex.java index 95c9fac5478a0..b0a0ec1180163 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationIndex.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationIndex.java @@ -9,16 +9,19 @@ import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.client.Client; +import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; @@ -43,23 +46,45 @@ public class AnnotationIndex { private static final Version HIDDEN_INTRODUCED_VERSION = Version.V_7_7_0; + /** + * Create the .ml-annotations-6 index with correct mappings if it does not already exist. This index is read and written by the UI + * results views, so needs to exist when there might be ML results to view. This method also waits for the index to be ready to search + * before it returns. + */ + public static void createAnnotationsIndexIfNecessaryAndWaitForYellow(Client client, ClusterState state, TimeValue masterNodeTimeout, + final ActionListener finalListener) { + + final ActionListener annotationsIndexCreatedListener = ActionListener.wrap(success -> { + final ClusterHealthRequest request = Requests.clusterHealthRequest(READ_ALIAS_NAME) + .waitForYellowStatus() + .masterNodeTimeout(masterNodeTimeout); + executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, request, + ActionListener.wrap( + r -> finalListener.onResponse(r.isTimedOut() == false), finalListener::onFailure), + client.admin().cluster()::health); + }, finalListener::onFailure); + + createAnnotationsIndexIfNecessary(client, state, masterNodeTimeout, annotationsIndexCreatedListener); + } + /** * Create the .ml-annotations-6 index with correct mappings if it does not already exist. This index is read and written by the UI * results views, so needs to exist when there might be ML results to view. */ - public static void createAnnotationsIndexIfNecessary(Client client, ClusterState state, final ActionListener finalListener) { + public static void createAnnotationsIndexIfNecessary(Client client, ClusterState state, TimeValue masterNodeTimeout, + final ActionListener finalListener) { boolean isHiddenAttributeAvailable = state.nodes().getMinNodeVersion().onOrAfter(HIDDEN_INTRODUCED_VERSION); - final ActionListener checkMappingsListener = ActionListener.wrap(success -> { - ElasticsearchMappings.addDocMappingIfMissing( - WRITE_ALIAS_NAME, - AnnotationIndex::annotationsMapping, - client, - state, - MasterNodeRequest.DEFAULT_MASTER_NODE_TIMEOUT, - finalListener); - }, finalListener::onFailure); + final ActionListener checkMappingsListener = ActionListener.wrap(success -> + ElasticsearchMappings.addDocMappingIfMissing( + WRITE_ALIAS_NAME, + AnnotationIndex::annotationsMapping, + client, + state, + masterNodeTimeout, + finalListener), + finalListener::onFailure); final ActionListener createAliasListener = ActionListener.wrap(success -> { IndicesAliasesRequest.AliasActions addReadAliasAction = diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java index 3580293d6be82..b51df785b6e09 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java @@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterStateListener; @@ -96,12 +97,13 @@ public void clusterChanged(ClusterChangedEvent event) { // The atomic flag prevents multiple simultaneous attempts to create the // index if there is a flurry of cluster state updates in quick succession if (this.isMaster && isIndexCreationInProgress.compareAndSet(false, true)) { - AnnotationIndex.createAnnotationsIndexIfNecessary(client, event.state(), ActionListener.wrap( - r -> isIndexCreationInProgress.set(false), - e -> { - isIndexCreationInProgress.set(false); - logger.error("Error creating ML annotations index or aliases", e); - })); + AnnotationIndex.createAnnotationsIndexIfNecessary(client, event.state(), MasterNodeRequest.DEFAULT_MASTER_NODE_TIMEOUT, + ActionListener.wrap( + r -> isIndexCreationInProgress.set(false), + e -> { + isIndexCreationInProgress.set(false); + logger.error("Error creating ML annotations index or aliases", e); + })); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java index f97f78bfb38db..492da64a668d9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java @@ -27,6 +27,7 @@ import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; import org.elasticsearch.xpack.core.ml.annotations.Annotation; +import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; @@ -83,8 +84,8 @@ protected void masterOperation(RevertModelSnapshotAction.Request request, Cluste logger.debug("Received request to revert to snapshot id '{}' for job '{}', deleting intervening results: {}", request.getSnapshotId(), jobId, request.getDeleteInterveningResults()); - // 4. Revert the state - ActionListener configMappingUpdateListener = ActionListener.wrap( + // 5. Revert the state + ActionListener annotationsIndexUpdateListener = ActionListener.wrap( r -> { PersistentTasksCustomMetadata tasks = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE); JobState jobState = MlTasks.getJobState(jobId, tasks); @@ -116,6 +117,13 @@ protected void masterOperation(RevertModelSnapshotAction.Request request, Cluste listener::onFailure ); + // 4. Ensure the annotations index mappings are up to date + ActionListener configMappingUpdateListener = ActionListener.wrap( + r -> AnnotationIndex.createAnnotationsIndexIfNecessaryAndWaitForYellow(client, state, request.masterNodeTimeout(), + annotationsIndexUpdateListener), + listener::onFailure + ); + // 3. Ensure the config index mappings are up to date ActionListener jobExistsListener = ActionListener.wrap( r -> ElasticsearchMappings.addDocMappingIfMissing(MlConfigIndex.indexName(), MlConfigIndex::mapping, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 8f99370cc153b..9a097672471c5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -504,7 +504,8 @@ public void openJob(JobTask jobTask, ClusterState clusterState, TimeValue master ); // Create the annotations index if necessary - this also updates the mappings if an old mapping is present - AnnotationIndex.createAnnotationsIndexIfNecessary(client, clusterState, annotationsIndexUpdateHandler); + AnnotationIndex.createAnnotationsIndexIfNecessaryAndWaitForYellow(client, clusterState, masterNodeTimeout, + annotationsIndexUpdateHandler); } private void startProcess(JobTask jobTask, Job job, BiConsumer closeHandler) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java index 0ff307867317f..43396eb7b16f1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/snapshot/upgrader/SnapshotUpgradeTaskExecutor.java @@ -220,7 +220,8 @@ protected void nodeOperation(AllocatedPersistentTask task, SnapshotUpgradeTaskPa ); // Create the annotations index if necessary - this also updates the mappings if an old mapping is present - AnnotationIndex.createAnnotationsIndexIfNecessary(client, clusterState, annotationsIndexUpdateHandler); + AnnotationIndex.createAnnotationsIndexIfNecessaryAndWaitForYellow(client, clusterState, MlTasks.PERSISTENT_TASK_MASTER_NODE_TIMEOUT, + annotationsIndexUpdateHandler); } @Override