Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Job in index: Get datafeed and job stats from index #34591

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,33 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;

import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public class TransportGetDatafeedsStatsAction extends TransportMasterNodeReadAction<GetDatafeedsStatsAction.Request,
GetDatafeedsStatsAction.Response> {

private final DatafeedConfigProvider datafeedConfigProvider;

@Inject
public TransportGetDatafeedsStatsAction(Settings settings, TransportService transportService,
ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
IndexNameExpressionResolver indexNameExpressionResolver,
DatafeedConfigProvider datafeedConfigProvider) {
super(settings, GetDatafeedsStatsAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, GetDatafeedsStatsAction.Request::new);
this.datafeedConfigProvider = datafeedConfigProvider;
}

@Override
Expand All @@ -57,16 +60,18 @@ protected void masterOperation(GetDatafeedsStatsAction.Request request, ClusterS
ActionListener<GetDatafeedsStatsAction.Response> listener) throws Exception {
logger.debug("Get stats for datafeed '{}'", request.getDatafeedId());

MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
Set<String> expandedDatafeedIds = mlMetadata.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds());

PersistentTasksCustomMetaData tasksInProgress = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
List<GetDatafeedsStatsAction.Response.DatafeedStats> results = expandedDatafeedIds.stream()
.map(datafeedId -> getDatafeedStats(datafeedId, state, tasksInProgress))
.collect(Collectors.toList());
QueryPage<GetDatafeedsStatsAction.Response.DatafeedStats> statsPage = new QueryPage<>(results, results.size(),
DatafeedConfig.RESULTS_FIELD);
listener.onResponse(new GetDatafeedsStatsAction.Response(statsPage));
datafeedConfigProvider.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds(), ActionListener.wrap(
expandedDatafeedIds -> {
PersistentTasksCustomMetaData tasksInProgress = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
List<GetDatafeedsStatsAction.Response.DatafeedStats> results = expandedDatafeedIds.stream()
.map(datafeedId -> getDatafeedStats(datafeedId, state, tasksInProgress))
.collect(Collectors.toList());
QueryPage<GetDatafeedsStatsAction.Response.DatafeedStats> statsPage = new QueryPage<>(results, results.size(),
DatafeedConfig.RESULTS_FIELD);
listener.onResponse(new GetDatafeedsStatsAction.Response(statsPage));
},
listener::onFailure
));
}

private static GetDatafeedsStatsAction.Response.DatafeedStats getDatafeedStats(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
Expand All @@ -32,7 +32,7 @@
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.stats.ForecastStats;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;

Expand All @@ -54,28 +54,37 @@ public class TransportGetJobsStatsAction extends TransportTasksAction<TransportO
private final ClusterService clusterService;
private final AutodetectProcessManager processManager;
private final JobResultsProvider jobResultsProvider;
private final JobConfigProvider jobConfigProvider;

@Inject
public TransportGetJobsStatsAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ActionFilters actionFilters, ClusterService clusterService,
IndexNameExpressionResolver indexNameExpressionResolver,
AutodetectProcessManager processManager, JobResultsProvider jobResultsProvider) {
AutodetectProcessManager processManager, JobResultsProvider jobResultsProvider,
JobConfigProvider jobConfigProvider) {
super(settings, GetJobsStatsAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, GetJobsStatsAction.Request::new, GetJobsStatsAction.Response::new,
ThreadPool.Names.MANAGEMENT);
this.clusterService = clusterService;
this.processManager = processManager;
this.jobResultsProvider = jobResultsProvider;
this.jobConfigProvider = jobConfigProvider;
}

@Override
protected void doExecute(Task task, GetJobsStatsAction.Request request, ActionListener<GetJobsStatsAction.Response> listener) {
MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterService.state());
request.setExpandedJobsIds(new ArrayList<>(mlMetadata.expandJobIds(request.getJobId(), request.allowNoJobs())));
ActionListener<GetJobsStatsAction.Response> finalListener = listener;
listener = ActionListener.wrap(response -> gatherStatsForClosedJobs(mlMetadata,
request, response, finalListener), listener::onFailure);
super.doExecute(task, request, listener);
protected void doExecute(Task task, GetJobsStatsAction.Request request, ActionListener<GetJobsStatsAction.Response> finalListener) {

jobConfigProvider.expandJobsIds(request.getJobId(), request.allowNoJobs(), ActionListener.wrap(
expandedIds -> {
request.setExpandedJobsIds(new ArrayList<>(expandedIds));
ActionListener<GetJobsStatsAction.Response> jobStatsListener = ActionListener.wrap(
response -> gatherStatsForClosedJobs(request, response, finalListener),
finalListener::onFailure
);
super.doExecute(task, request, jobStatsListener);
},
finalListener::onFailure
));
}

@Override
Expand Down Expand Up @@ -123,21 +132,20 @@ protected void taskOperation(GetJobsStatsAction.Request request, TransportOpenJo

// Up until now we gathered the stats for jobs that were open,
// This method will fetch the stats for missing jobs, that was stored in the jobs index
void gatherStatsForClosedJobs(MlMetadata mlMetadata, GetJobsStatsAction.Request request, GetJobsStatsAction.Response response,
void gatherStatsForClosedJobs(GetJobsStatsAction.Request request, GetJobsStatsAction.Response response,
ActionListener<GetJobsStatsAction.Response> listener) {
List<String> jobIds = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata,
request.getExpandedJobsIds(), response.getResponse().results());
if (jobIds.isEmpty()) {
List<String> closedJobIds = determineJobIdsWithoutLiveStats(request.getExpandedJobsIds(), response.getResponse().results());
if (closedJobIds.isEmpty()) {
listener.onResponse(response);
return;
}

AtomicInteger counter = new AtomicInteger(jobIds.size());
AtomicArray<GetJobsStatsAction.Response.JobStats> jobStats = new AtomicArray<>(jobIds.size());
AtomicInteger counter = new AtomicInteger(closedJobIds.size());
AtomicArray<GetJobsStatsAction.Response.JobStats> jobStats = new AtomicArray<>(closedJobIds.size());
PersistentTasksCustomMetaData tasks = clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
for (int i = 0; i < jobIds.size(); i++) {
for (int i = 0; i < closedJobIds.size(); i++) {
int slot = i;
String jobId = jobIds.get(i);
String jobId = closedJobIds.get(i);
gatherForecastStats(jobId, forecastStats -> {
gatherDataCountsAndModelSizeStats(jobId, (dataCounts, modelSizeStats) -> {
JobState jobState = MlTasks.getJobState(jobId, tasks);
Expand Down Expand Up @@ -180,11 +188,9 @@ static TimeValue durationToTimeValue(Optional<Duration> duration) {
}
}

static List<String> determineNonDeletedJobIdsWithoutLiveStats(MlMetadata mlMetadata,
List<String> requestedJobIds,
List<GetJobsStatsAction.Response.JobStats> stats) {
static List<String> determineJobIdsWithoutLiveStats(List<String> requestedJobIds,
List<GetJobsStatsAction.Response.JobStats> stats) {
Set<String> excludeJobIds = stats.stream().map(GetJobsStatsAction.Response.JobStats::getJobId).collect(Collectors.toSet());
return requestedJobIds.stream().filter(jobId -> !excludeJobIds.contains(jobId) &&
!mlMetadata.isJobDeleted(jobId)).collect(Collectors.toList());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the tricky bit as it is no longer easy to find if the job is being deleted without making an async call for each job. Before this change if a job was being deleted then the calls to gatherForecastStats and gatherDataCountsAndModelSizeStats would not be made, it is still safe to make those calls if those documents are deleted the response accepts null for forecast stats and model size stats and gather data counts returns a default constructed object if the document is not found. Some jobs will take a long time to delete and the job config document is the last thing to be removed so it is possible that forecast stats are in the process of deletion meaning that in a long running delete subsequent calls. It's a reasonable argument to say the API is doing what is says is it doing in this case. Compare with GET jobs with returns all jobs including deleting jobs.

One helpful change is to add an excludeDeleting parameter to jobConfigProvider.expandJobsIds when called in doExecute which would make the behaviour closer to the current albeit with race.

return requestedJobIds.stream().filter(jobId -> !excludeJobIds.contains(jobId)).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;

Expand All @@ -18,65 +17,46 @@
import java.util.List;
import java.util.Optional;

import static org.elasticsearch.xpack.ml.action.TransportGetJobsStatsAction.determineNonDeletedJobIdsWithoutLiveStats;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.elasticsearch.xpack.ml.action.TransportGetJobsStatsAction.determineJobIdsWithoutLiveStats;

public class TransportGetJobsStatsActionTests extends ESTestCase {

public void testDetermineJobIds() {

MlMetadata mlMetadata = mock(MlMetadata.class);
when(mlMetadata.isJobDeleted(eq("id4"))).thenReturn(true);

List<String> result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata,
Collections.singletonList("id1"), Collections.emptyList());
List<String> result = determineJobIdsWithoutLiveStats(Collections.singletonList("id1"), Collections.emptyList());
assertEquals(1, result.size());
assertEquals("id1", result.get(0));

result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata,
Collections.singletonList("id1"), Collections.singletonList(
result = determineJobIdsWithoutLiveStats(Collections.singletonList("id1"), Collections.singletonList(
new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, null, JobState.OPENED, null, null, null)));
assertEquals(0, result.size());

result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata,
Arrays.asList("id1", "id2", "id3"), Collections.emptyList());
result = determineJobIdsWithoutLiveStats(Arrays.asList("id1", "id2", "id3"), Collections.emptyList());
assertEquals(3, result.size());
assertEquals("id1", result.get(0));
assertEquals("id2", result.get(1));
assertEquals("id3", result.get(2));

result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata,
Arrays.asList("id1", "id2", "id3"),
result = determineJobIdsWithoutLiveStats(Arrays.asList("id1", "id2", "id3"),
Collections.singletonList(new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, null,
JobState.CLOSED, null, null, null))
);
assertEquals(2, result.size());
assertEquals("id2", result.get(0));
assertEquals("id3", result.get(1));

result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata,
Arrays.asList("id1", "id2", "id3"), Arrays.asList(
result = determineJobIdsWithoutLiveStats(Arrays.asList("id1", "id2", "id3"), Arrays.asList(
new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, null, JobState.OPENED, null, null, null),
new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, null, JobState.OPENED, null, null, null)
));
assertEquals(1, result.size());
assertEquals("id2", result.get(0));

result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata, Arrays.asList("id1", "id2", "id3"), Arrays.asList(
result = determineJobIdsWithoutLiveStats(Arrays.asList("id1", "id2", "id3"), Arrays.asList(
new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, null, JobState.OPENED, null, null, null),
new GetJobsStatsAction.Response.JobStats("id2", new DataCounts("id2"), null, null, JobState.OPENED, null, null, null),
new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, null, JobState.OPENED, null, null, null)));
assertEquals(0, result.size());

// No jobs running, but job 4 is being deleted
result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata,
Arrays.asList("id1", "id2", "id3", "id4"), Collections.emptyList());
assertEquals(3, result.size());
assertEquals("id1", result.get(0));
assertEquals("id2", result.get(1));
assertEquals("id3", result.get(2));
}

public void testDurationToTimeValue() {
Expand Down