Skip to content

Commit

Permalink
[7.9][ML] Update reindexing task progress before persisting job progr…
Browse files Browse the repository at this point in the history
…ess (elastic#61868)

This fixes a bug introduced by elastic#61782. In that PR I thought I could
simplify the persistence of progress by using the progress straight
from the stats holder in the task instead of calling the get
stats action. However, I overlooked that it is then possible to
have stale progress for the reindexing task as that is only updated
when the get stats API is called.

In this commit this is fixed by updating reindexing task progress
before persisting the job progress. This seems to be much more
lightweight than calling the get stats request.

Closes elastic#61852

Backport of elastic#61868
  • Loading branch information
dimitris-athanasiou committed Sep 2, 2020
1 parent e9ff88e commit 9ba16de
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ protected void taskOperation(GetDataFrameAnalyticsStatsAction.Request request, D
}, listener::onFailure
);

// We must update the progress of the reindexing task as it might be stale
task.updateReindexTaskProgress(reindexingProgressListener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ void persistProgress(Client client, String jobId, Runnable runnable) {

String progressDocId = StoredProgress.documentId(jobId);

// Step 3: Run the runnable provided as the argument
// Step 4: Run the runnable provided as the argument
ActionListener<IndexResponse> indexProgressDocListener = ActionListener.wrap(
indexResponse -> {
LOGGER.debug("[{}] Successfully indexed progress document", jobId);
Expand All @@ -297,7 +297,7 @@ void persistProgress(Client client, String jobId, Runnable runnable) {
}
);

// Step 2: Create or update the progress document:
// Step 3: Create or update the progress document:
// - if the document did not exist, create the new one in the current write index
// - if the document did exist, update it in the index where it resides (not necessarily the current write index)
ActionListener<SearchResponse> searchFormerProgressDocListener = ActionListener.wrap(
Expand All @@ -324,14 +324,26 @@ void persistProgress(Client client, String jobId, Runnable runnable) {
}
);

// Step 1: Search for existing progress document in .ml-state*
SearchRequest searchRequest =
new SearchRequest(AnomalyDetectorsIndex.jobStateIndexPattern())
.source(
new SearchSourceBuilder()
.size(1)
.query(new IdsQueryBuilder().addIds(progressDocId)));
executeAsyncWithOrigin(client, ML_ORIGIN, SearchAction.INSTANCE, searchRequest, searchFormerProgressDocListener);
// Step 2: Search for existing progress document in .ml-state*
ActionListener<Void> reindexProgressUpdateListener = ActionListener.wrap(
aVoid -> {
SearchRequest searchRequest =
new SearchRequest(AnomalyDetectorsIndex.jobStateIndexPattern())
.source(
new SearchSourceBuilder()
.size(1)
.query(new IdsQueryBuilder().addIds(progressDocId)));
executeAsyncWithOrigin(client, ML_ORIGIN, SearchAction.INSTANCE, searchRequest, searchFormerProgressDocListener);
},
e -> {
LOGGER.error(new ParameterizedMessage(
"[{}] cannot persist progress as an error occurred while updating reindexing task progress", taskParams.getId()), e);
runnable.run();
}
);

// Step 1: Update reindexing progress as it could be stale
updateReindexTaskProgress(reindexProgressUpdateListener);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.List;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
Expand Down Expand Up @@ -208,8 +209,9 @@ public void testSetFailed() throws IOException {
PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, mock(ThreadPool.class), client);
TaskManager taskManager = mock(TaskManager.class);

// We leave reindexing progress here to zero in order to check it is updated before it is persisted
List<PhaseProgress> progress = Arrays.asList(
new PhaseProgress(ProgressTracker.REINDEXING, 100),
new PhaseProgress(ProgressTracker.REINDEXING, 0),
new PhaseProgress(ProgressTracker.LOADING_DATA, 100),
new PhaseProgress(ProgressTracker.WRITING_RESULTS, 30));

Expand All @@ -231,6 +233,7 @@ public void testSetFailed() throws IOException {
new DataFrameAnalyticsTask(
123, "type", "action", null, Collections.emptyMap(), client, clusterService, analyticsManager, auditor, taskParams);
task.init(persistentTasksService, taskManager, "task-id", 42);
task.setReindexingFinished();
Exception exception = new Exception("some exception");

task.setFailed(exception);
Expand All @@ -250,7 +253,8 @@ public void testSetFailed() throws IOException {
try (XContentParser parser = JsonXContent.jsonXContent.createParser(
NamedXContentRegistry.EMPTY, DeprecationHandler.IGNORE_DEPRECATIONS, indexRequest.source().utf8ToString())) {
StoredProgress parsedProgress = StoredProgress.PARSER.apply(parser, null);
assertThat(parsedProgress.get(), equalTo(progress));
assertThat(parsedProgress.get(), hasSize(3));
assertThat(parsedProgress.get().get(0), equalTo(new PhaseProgress("reindexing", 100)));
}

verify(client).execute(
Expand All @@ -259,7 +263,7 @@ public void testSetFailed() throws IOException {
"task-id", 42, new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.FAILED, 42, "some exception"))),
any());

verifyNoMoreInteractions(client, clusterService, analyticsManager, auditor, taskManager);
verifyNoMoreInteractions(client, analyticsManager, auditor, taskManager);
}

@SuppressWarnings("unchecked")
Expand Down

0 comments on commit 9ba16de

Please sign in to comment.