Skip to content

Commit

Permalink
[ML] wait for .ml-state-write alias to be readable (#79731)
Browse files Browse the repository at this point in the history
In tests and actual usage, it is possible that one job creates the .ml-state-write and another starts immediately afterwards, sees that the index is created, and moves on. But, what this means, is that the second job could blast past the check and the job starts/stops/etc. all with the .ml-state-write alias pointing to an index that is not even readable.

This commit waits for the index to be yellow before continuing opening the job.

closes: #79636
  • Loading branch information
benwtrent authored Oct 26, 2021
1 parent e04911b commit 17727e5
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
Expand Down Expand Up @@ -68,10 +68,13 @@ public static void createAnnotationsIndexIfNecessaryAndWaitForYellow(Client clie
final ClusterHealthRequest request = Requests.clusterHealthRequest(READ_ALIAS_NAME)
.waitForYellowStatus()
.masterNodeTimeout(masterNodeTimeout);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, request,
ActionListener.<ClusterHealthResponse>wrap(
r -> finalListener.onResponse(r.isTimedOut() == false), finalListener::onFailure),
client.admin().cluster()::health);
executeAsyncWithOrigin(
client,
ML_ORIGIN,
ClusterHealthAction.INSTANCE,
request,
ActionListener.wrap(r -> finalListener.onResponse(r.isTimedOut() == false), finalListener::onFailure)
);
}, finalListener::onFailure);

createAnnotationsIndexIfNecessary(client, state, masterNodeTimeout, annotationsIndexCreatedListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,19 @@

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xpack.core.ml.utils.MlIndexAndAlias;
import org.elasticsearch.xpack.core.template.TemplateUtils;

import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

/**
* Methods for handling index naming related functions
*/
Expand Down Expand Up @@ -83,6 +89,35 @@ public static void createStateIndexAndAliasIfNecessary(Client client, ClusterSta
finalListener);
}

public static void createStateIndexAndAliasIfNecessaryAndWaitForYellow(Client client,
ClusterState state,
IndexNameExpressionResolver resolver,
TimeValue masterNodeTimeout,
final ActionListener<Boolean> finalListener) {
final ActionListener<Boolean> stateIndexAndAliasCreated = ActionListener.wrap(success -> {
final ClusterHealthRequest request = Requests.clusterHealthRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias())
.waitForYellowStatus()
.masterNodeTimeout(masterNodeTimeout);
executeAsyncWithOrigin(
client,
ML_ORIGIN,
ClusterHealthAction.INSTANCE,
request,
ActionListener.wrap(r -> finalListener.onResponse(r.isTimedOut() == false), finalListener::onFailure)
);
}, finalListener::onFailure);

MlIndexAndAlias.createIndexAndAliasIfNecessary(
client,
state,
resolver,
AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX,
AnomalyDetectorsIndex.jobStateIndexWriteAlias(),
masterNodeTimeout,
stateIndexAndAliasCreated
);
}

public static String wrappedResultsMapping() {
return "{\n\"_doc\" : " + resultsMapping() + "\n}";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,13 @@ public void execute(DataFrameAnalyticsTask task, ClusterState clusterState, Time
);

// Make sure the state index and alias exist
AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(new ParentTaskAssigningClient(client, task.getParentTaskId()),
clusterState, expressionResolver, masterNodeTimeout, stateAliasListener);
AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessaryAndWaitForYellow(
new ParentTaskAssigningClient(client, task.getParentTaskId()),
clusterState,
expressionResolver,
masterNodeTimeout,
stateAliasListener
);
}

private void createStatsIndexAndUpdateMappingsIfNecessary(Client client, ClusterState clusterState, TimeValue masterNodeTimeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,11 +525,19 @@ public void openJob(JobTask jobTask, ClusterState clusterState, TimeValue master
}
);

// Make sure the state index and alias exist
// Make sure the state index and alias exist and are writeable
ActionListener<Boolean> resultsMappingUpdateHandler = ActionListener.wrap(
ack -> AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterState, expressionResolver, masterNodeTimeout,
stateAliasHandler),
e -> closeHandler.accept(e, true)
ack -> AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessaryAndWaitForYellow(
client,
clusterState,
expressionResolver,
masterNodeTimeout,
stateAliasHandler
),
e -> {
logger.error(new ParameterizedMessage("[{}] ML state index alias could not be updated", jobId), e);
closeHandler.accept(e, true);
}
);

// Try adding the results doc mapping - this updates to the latest version if an old mapping is present
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasMetadata;
Expand All @@ -22,6 +25,7 @@
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.env.Environment;
Expand Down Expand Up @@ -72,6 +76,8 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -146,13 +152,36 @@ public class AutodetectProcessManagerTests extends ESTestCase {
private Quantiles quantiles = new Quantiles("foo", new Date(), "state");

@Before
@SuppressWarnings("unchecked")
public void setup() throws Exception {
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()).build();
client = mock(Client.class);

threadPool = mock(ThreadPool.class);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
when(threadPool.executor(anyString())).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
when(client.threadPool()).thenReturn(threadPool);
doAnswer(invocationOnMock -> {
if (invocationOnMock.getArguments()[0] instanceof ActionType<?>) {
ActionType<?> v = (ActionType<?>) invocationOnMock.getArguments()[0];
ActionListener<?> l = (ActionListener<?>) invocationOnMock.getArguments()[2];
ParameterizedType parameterizedType = (ParameterizedType) v.getClass().getGenericSuperclass();
Type t = parameterizedType.getActualTypeArguments()[0];
if (t.getTypeName().contains("AcknowledgedResponse")) {
ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) l;
listener.onResponse(AcknowledgedResponse.TRUE);
return null;
}
if (t.getTypeName().contains("ClusterHealthResponse")) {
ActionListener<ClusterHealthResponse> listener = (ActionListener<ClusterHealthResponse>) l;
listener.onResponse(
new ClusterHealthResponse("test", new String[0], ClusterState.EMPTY_STATE, 0, 0, 0, TimeValue.ZERO, false)
);
return null;
}
fail("Mock not configured to handle generic type " + t.getTypeName());
}
return null;
}).when(client).execute(any(), any(), any());

analysisRegistry = CategorizationAnalyzerTests.buildTestAnalysisRegistry(TestEnvironment.newEnvironment(settings));
jobManager = mock(JobManager.class);
Expand Down

0 comments on commit 17727e5

Please sign in to comment.