From 17727e590c31a830186bb40551c7ae96415940c2 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Tue, 26 Oct 2021 07:22:36 -0400 Subject: [PATCH] [ML] wait for .ml-state-write alias to be readable (#79731) In tests and actual usage, it is possible that one job creates the .ml-state-write and another starts immediately afterwards, sees that the index is created, and moves on. But, what this means, is that the second job could blast past the check and the job starts/stops/etc. all with the .ml-state-write alias pointing to an index that is not even readable. This commit waits for the index to be yellow before continuing opening the job. closes: #79636 --- .../core/ml/annotations/AnnotationIndex.java | 13 ++++--- .../persistence/AnomalyDetectorsIndex.java | 35 +++++++++++++++++++ .../dataframe/DataFrameAnalyticsManager.java | 9 +++-- .../autodetect/AutodetectProcessManager.java | 16 ++++++--- .../AutodetectProcessManagerTests.java | 31 +++++++++++++++- 5 files changed, 92 insertions(+), 12 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationIndex.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationIndex.java index 9d2b931013952..3f1782911b060 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationIndex.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationIndex.java @@ -12,8 +12,8 @@ import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; @@ -68,10 +68,13 @@ public static void createAnnotationsIndexIfNecessaryAndWaitForYellow(Client clie final ClusterHealthRequest request = Requests.clusterHealthRequest(READ_ALIAS_NAME) .waitForYellowStatus() .masterNodeTimeout(masterNodeTimeout); - executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, request, - ActionListener.wrap( - r -> finalListener.onResponse(r.isTimedOut() == false), finalListener::onFailure), - client.admin().cluster()::health); + executeAsyncWithOrigin( + client, + ML_ORIGIN, + ClusterHealthAction.INSTANCE, + request, + ActionListener.wrap(r -> finalListener.onResponse(r.isTimedOut() == false), finalListener::onFailure) + ); }, finalListener::onFailure); createAnnotationsIndexIfNecessary(client, state, masterNodeTimeout, annotationsIndexCreatedListener); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java index 6dad58439e7e4..30919dbfcdbdd 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java @@ -8,13 +8,19 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.client.Client; +import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.core.TimeValue; import org.elasticsearch.xpack.core.ml.utils.MlIndexAndAlias; import org.elasticsearch.xpack.core.template.TemplateUtils; +import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; + /** * Methods for handling index naming related functions */ @@ -83,6 +89,35 @@ public static void createStateIndexAndAliasIfNecessary(Client client, ClusterSta finalListener); } + public static void createStateIndexAndAliasIfNecessaryAndWaitForYellow(Client client, + ClusterState state, + IndexNameExpressionResolver resolver, + TimeValue masterNodeTimeout, + final ActionListener finalListener) { + final ActionListener stateIndexAndAliasCreated = ActionListener.wrap(success -> { + final ClusterHealthRequest request = Requests.clusterHealthRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias()) + .waitForYellowStatus() + .masterNodeTimeout(masterNodeTimeout); + executeAsyncWithOrigin( + client, + ML_ORIGIN, + ClusterHealthAction.INSTANCE, + request, + ActionListener.wrap(r -> finalListener.onResponse(r.isTimedOut() == false), finalListener::onFailure) + ); + }, finalListener::onFailure); + + MlIndexAndAlias.createIndexAndAliasIfNecessary( + client, + state, + resolver, + AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX, + AnomalyDetectorsIndex.jobStateIndexWriteAlias(), + masterNodeTimeout, + stateIndexAndAliasCreated + ); + } + public static String wrappedResultsMapping() { return "{\n\"_doc\" : " + resultsMapping() + "\n}"; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java index 5ac6beac166ce..c6e0697a39982 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java @@ -130,8 +130,13 @@ public void execute(DataFrameAnalyticsTask task, ClusterState clusterState, Time ); // Make sure the state index and alias exist - AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(new ParentTaskAssigningClient(client, task.getParentTaskId()), - clusterState, expressionResolver, masterNodeTimeout, stateAliasListener); + AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessaryAndWaitForYellow( + new ParentTaskAssigningClient(client, task.getParentTaskId()), + clusterState, + expressionResolver, + masterNodeTimeout, + stateAliasListener + ); } private void createStatsIndexAndUpdateMappingsIfNecessary(Client client, ClusterState clusterState, TimeValue masterNodeTimeout, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index fd7cd115eb177..cdd8f6820d459 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -525,11 +525,19 @@ public void openJob(JobTask jobTask, ClusterState clusterState, TimeValue master } ); - // Make sure the state index and alias exist + // Make sure the state index and alias exist and are writeable ActionListener resultsMappingUpdateHandler = ActionListener.wrap( - ack -> AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterState, expressionResolver, masterNodeTimeout, - stateAliasHandler), - e -> closeHandler.accept(e, true) + ack -> AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessaryAndWaitForYellow( + client, + clusterState, + expressionResolver, + masterNodeTimeout, + stateAliasHandler + ), + e -> { + logger.error(new ParameterizedMessage("[{}] ML state index alias could not be updated", jobId), e); + closeHandler.accept(e, true); + } ); // Try adding the results doc mapping - this updates to the latest version if an old mapping is present diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index 0fd0fdaeede77..d08bb28ac4ab1 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -9,6 +9,9 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.AliasMetadata; @@ -22,6 +25,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.env.Environment; @@ -72,6 +76,8 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collections; @@ -146,13 +152,36 @@ public class AutodetectProcessManagerTests extends ESTestCase { private Quantiles quantiles = new Quantiles("foo", new Date(), "state"); @Before + @SuppressWarnings("unchecked") public void setup() throws Exception { Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()).build(); client = mock(Client.class); - threadPool = mock(ThreadPool.class); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); when(threadPool.executor(anyString())).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE); + when(client.threadPool()).thenReturn(threadPool); + doAnswer(invocationOnMock -> { + if (invocationOnMock.getArguments()[0] instanceof ActionType) { + ActionType v = (ActionType) invocationOnMock.getArguments()[0]; + ActionListener l = (ActionListener) invocationOnMock.getArguments()[2]; + ParameterizedType parameterizedType = (ParameterizedType) v.getClass().getGenericSuperclass(); + Type t = parameterizedType.getActualTypeArguments()[0]; + if (t.getTypeName().contains("AcknowledgedResponse")) { + ActionListener listener = (ActionListener) l; + listener.onResponse(AcknowledgedResponse.TRUE); + return null; + } + if (t.getTypeName().contains("ClusterHealthResponse")) { + ActionListener listener = (ActionListener) l; + listener.onResponse( + new ClusterHealthResponse("test", new String[0], ClusterState.EMPTY_STATE, 0, 0, 0, TimeValue.ZERO, false) + ); + return null; + } + fail("Mock not configured to handle generic type " + t.getTypeName()); + } + return null; + }).when(client).execute(any(), any(), any()); analysisRegistry = CategorizationAnalyzerTests.buildTestAnalysisRegistry(TestEnvironment.newEnvironment(settings)); jobManager = mock(JobManager.class);