Skip to content

Commit

Permalink
[ML] Update reindexing task progress before persisting job progress
Browse files Browse the repository at this point in the history
This fixes a bug introduced by elastic#61782. In that PR I thought I could
simplify the persistence of progress by using the progress straight
from the stats holder in the task instead of calling the get
stats action. However, I overlooked that it is then possible to
have stale progress for the reindexing task as that is only updated
when the get stats API is called.

In this commit this is fixed by updating reindexing task progress
before persisting the job progress. This seems to be much more
lightweight than calling the get stats request.

Closes elastic#61852
  • Loading branch information
dimitris-athanasiou committed Sep 2, 2020
1 parent 3049e55 commit 32d3427
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ void persistProgress(Client client, String jobId, Runnable runnable) {

String progressDocId = StoredProgress.documentId(jobId);

// Step 3: Run the runnable provided as the argument
// Step 4: Run the runnable provided as the argument
ActionListener<IndexResponse> indexProgressDocListener = ActionListener.wrap(
indexResponse -> {
LOGGER.debug("[{}] Successfully indexed progress document", jobId);
Expand All @@ -303,7 +303,7 @@ void persistProgress(Client client, String jobId, Runnable runnable) {
}
);

// Step 2: Create or update the progress document:
// Step 3: Create or update the progress document:
// - if the document did not exist, create the new one in the current write index
// - if the document did exist, update it in the index where it resides (not necessarily the current write index)
ActionListener<SearchResponse> searchFormerProgressDocListener = ActionListener.wrap(
Expand Down Expand Up @@ -331,14 +331,26 @@ void persistProgress(Client client, String jobId, Runnable runnable) {
}
);

// Step 1: Search for existing progress document in .ml-state*
SearchRequest searchRequest =
new SearchRequest(AnomalyDetectorsIndex.jobStateIndexPattern())
.source(
new SearchSourceBuilder()
.size(1)
.query(new IdsQueryBuilder().addIds(progressDocId)));
executeAsyncWithOrigin(client, ML_ORIGIN, SearchAction.INSTANCE, searchRequest, searchFormerProgressDocListener);
// Step 2: Search for existing progress document in .ml-state*
ActionListener<Void> reindexProgressUpdateListener = ActionListener.wrap(
aVoid -> {
SearchRequest searchRequest =
new SearchRequest(AnomalyDetectorsIndex.jobStateIndexPattern())
.source(
new SearchSourceBuilder()
.size(1)
.query(new IdsQueryBuilder().addIds(progressDocId)));
executeAsyncWithOrigin(client, ML_ORIGIN, SearchAction.INSTANCE, searchRequest, searchFormerProgressDocListener);
},
e -> {
LOGGER.error(new ParameterizedMessage(
"[{}] cannot persist progress as an error occurred while updating reindexing task progress", taskParams.getId()), e);
runnable.run();
}
);

// Step 1: Update reindexing progress as it could be stale
updateReindexTaskProgress(reindexProgressUpdateListener);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
Expand Down Expand Up @@ -216,8 +217,9 @@ private void testSetFailed(boolean nodeShuttingDown) throws IOException {
PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, mock(ThreadPool.class), client);
TaskManager taskManager = mock(TaskManager.class);

// We leave reindexing progress here to zero in order to check it is updated before it is persisted
List<PhaseProgress> progress = List.of(
new PhaseProgress(ProgressTracker.REINDEXING, 100),
new PhaseProgress(ProgressTracker.REINDEXING, 0),
new PhaseProgress(ProgressTracker.LOADING_DATA, 100),
new PhaseProgress(ProgressTracker.WRITING_RESULTS, 30));

Expand All @@ -239,6 +241,7 @@ private void testSetFailed(boolean nodeShuttingDown) throws IOException {
new DataFrameAnalyticsTask(
123, "type", "action", null, Map.of(), client, clusterService, analyticsManager, auditor, taskParams);
task.init(persistentTasksService, taskManager, "task-id", 42);
task.setReindexingFinished();
Exception exception = new Exception("some exception");

task.setFailed(exception);
Expand All @@ -260,7 +263,8 @@ private void testSetFailed(boolean nodeShuttingDown) throws IOException {
try (XContentParser parser = JsonXContent.jsonXContent.createParser(
NamedXContentRegistry.EMPTY, DeprecationHandler.IGNORE_DEPRECATIONS, indexRequest.source().utf8ToString())) {
StoredProgress parsedProgress = StoredProgress.PARSER.apply(parser, null);
assertThat(parsedProgress.get(), equalTo(progress));
assertThat(parsedProgress.get(), hasSize(3));
assertThat(parsedProgress.get().get(0), equalTo(new PhaseProgress("reindexing", 100)));
}

verify(client).execute(
Expand All @@ -269,7 +273,7 @@ private void testSetFailed(boolean nodeShuttingDown) throws IOException {
"task-id", 42, new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.FAILED, 42, "some exception"))),
any());
}
verifyNoMoreInteractions(client, clusterService, analyticsManager, auditor, taskManager);
verifyNoMoreInteractions(client, analyticsManager, auditor, taskManager);
}

@SuppressWarnings("unchecked")
Expand Down

0 comments on commit 32d3427

Please sign in to comment.