Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Adjust finalize job action to work with documents #34226

Merged
merged 13 commits into from
Oct 16, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public static Set<String> openJobIds(PersistentTasksCustomMetaData tasks) {
* Is there an ml anomaly detector job task for the job {@code jobId}?
* @param jobId The job id
* @param tasks Persistent tasks
* @return
* @return True if the job has a task
*/
public static boolean taskExistsForJob(String jobId, PersistentTasksCustomMetaData tasks) {
return openJobIds(tasks).contains(jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public static UpdateJobAction.Request parseRequest(String jobId, XContentParser

/** Indicates an update that was not triggered by a user */
private boolean isInternal;
private boolean waitForAck = true;

public Request(String jobId, JobUpdate update) {
this(jobId, update, false);
Expand Down Expand Up @@ -88,14 +87,6 @@ public boolean isInternal() {
return isInternal;
}

public boolean isWaitForAck() {
return waitForAck;
}

public void setWaitForAck(boolean waitForAck) {
this.waitForAck = waitForAck;
}

@Override
public ActionRequestValidationException validate() {
return null;
Expand All @@ -111,11 +102,6 @@ public void readFrom(StreamInput in) throws IOException {
} else {
isInternal = false;
}
if (in.getVersion().onOrAfter(Version.V_6_3_0)) {
waitForAck = in.readBoolean();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to still read from the wire using something like this?

    // TODO change CURRENT to specific version when feature branch is merged
    if (in.getVersion().onOrAfter(Version.V_6_3_0) && in.getVersion().before(Version.CURRENT)) {
        in.readBoolean(); // was waitForAck
    }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what I was thinking I should have reviewed this before asking for a review sorry. I pushed 2 more commits one addresses this and the other adds the finalize job action back into the security tests.

} else {
waitForAck = true;
}
}

@Override
Expand All @@ -126,9 +112,6 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_6_2_2)) {
out.writeBoolean(isInternal);
}
if (out.getVersion().onOrAfter(Version.V_6_3_0)) {
out.writeBoolean(waitForAck);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to read, if writing to a version in between 6.3.0 and the version the feature branch gets merged into should this write true so the wire format is as expected on the remote end?

}
}

@Override
Expand All @@ -144,8 +127,7 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
UpdateJobAction.Request that = (UpdateJobAction.Request) o;
return Objects.equals(jobId, that.jobId) &&
Objects.equals(update, that.update) &&
isInternal == that.isInternal;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The isInternal data member hasn't been removed - is it correct to remove it here? (If it is for some reason then it should also be removed from hashCode().)

Objects.equals(update, that.update);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
INTERNAL_PARSER.declareString(Builder::setModelSnapshotId, Job.MODEL_SNAPSHOT_ID);
INTERNAL_PARSER.declareLong(Builder::setEstablishedModelMemory, Job.ESTABLISHED_MODEL_MEMORY);
INTERNAL_PARSER.declareString(Builder::setJobVersion, Job.JOB_VERSION);
INTERNAL_PARSER.declareBoolean(Builder::setClearJobFinishTime, CLEAR_JOB_FINISH_TIME);
INTERNAL_PARSER.declareBoolean(Builder::setClearFinishTime, CLEAR_JOB_FINISH_TIME);
}

private final String jobId;
Expand Down Expand Up @@ -710,7 +710,7 @@ public Builder setJobVersion(String version) {
return this;
}

public Builder setClearJobFinishTime(boolean clearJobFinishTime) {
public Builder setClearFinishTime(boolean clearJobFinishTime) {
this.clearJobFinishTime = clearJobFinishTime;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ protected UpdateJobAction.Request createTestInstance() {
JobUpdate.Builder jobUpdate = new JobUpdate.Builder(jobId);
jobUpdate.setAnalysisLimits(new AnalysisLimits(100L, 100L));
UpdateJobAction.Request request = new UpdateJobAction.Request(jobId, jobUpdate.build());
request.setWaitForAck(randomBoolean());
return request;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public JobUpdate createRandom(String jobId, @Nullable Job job) {
update.setJobVersion(randomFrom(Version.CURRENT, Version.V_6_2_0, Version.V_6_1_0));
}
if (useInternalParser) {
update.setClearJobFinishTime(randomBoolean());
update.setClearFinishTime(randomBoolean());
}

return update.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.elasticsearch.xpack.core.ml.action.DeleteFilterAction;
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
import org.elasticsearch.xpack.core.ml.action.FlushJobAction;
import org.elasticsearch.xpack.core.ml.action.GetBucketsAction;
import org.elasticsearch.xpack.core.ml.action.GetCategoriesAction;
Expand Down Expand Up @@ -617,7 +616,6 @@ public void testMachineLearningAdminRole() {
assertThat(role.cluster().check(DeleteFilterAction.NAME, request), is(true));
assertThat(role.cluster().check(DeleteJobAction.NAME, request), is(true));
assertThat(role.cluster().check(DeleteModelSnapshotAction.NAME, request), is(true));
assertThat(role.cluster().check(FinalizeJobExecutionAction.NAME, request), is(false)); // internal use only
assertThat(role.cluster().check(FlushJobAction.NAME, request), is(true));
assertThat(role.cluster().check(GetBucketsAction.NAME, request), is(true));
assertThat(role.cluster().check(GetCategoriesAction.NAME, request), is(true));
Expand Down Expand Up @@ -669,7 +667,6 @@ public void testMachineLearningUserRole() {
assertThat(role.cluster().check(DeleteFilterAction.NAME, request), is(false));
assertThat(role.cluster().check(DeleteJobAction.NAME, request), is(false));
assertThat(role.cluster().check(DeleteModelSnapshotAction.NAME, request), is(false));
assertThat(role.cluster().check(FinalizeJobExecutionAction.NAME, request), is(false));
assertThat(role.cluster().check(FlushJobAction.NAME, request), is(false));
assertThat(role.cluster().check(GetBucketsAction.NAME, request), is(true));
assertThat(role.cluster().check(GetCategoriesAction.NAME, request), is(true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
Expand All @@ -50,9 +49,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

public class TransportCloseJobAction extends TransportTasksAction<TransportOpenJobAction.JobTask, CloseJobAction.Request,
CloseJobAction.Response, CloseJobAction.Response> {

Expand Down Expand Up @@ -427,10 +423,7 @@ void waitForJobClosed(CloseJobAction.Request request, WaitForCloseRequest waitFo
}, request.getCloseTimeout(), new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean result) {
FinalizeJobExecutionAction.Request finalizeRequest = new FinalizeJobExecutionAction.Request(
waitForCloseRequest.jobsToFinalize.toArray(new String[0]));
executeAsyncWithOrigin(client, ML_ORIGIN, FinalizeJobExecutionAction.INSTANCE, finalizeRequest,
ActionListener.wrap(r -> listener.onResponse(response), listener::onFailure));
listener.onResponse(response);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,15 @@
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
import org.elasticsearch.xpack.core.ml.job.config.Job;

import java.util.Date;

public class TransportFinalizeJobExecutionAction extends TransportMasterNodeAction<FinalizeJobExecutionAction.Request,
AcknowledgedResponse> {
Expand All @@ -51,41 +44,10 @@ protected AcknowledgedResponse newResponse() {

@Override
protected void masterOperation(FinalizeJobExecutionAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) throws Exception {
String jobIdString = String.join(",", request.getJobIds());
String source = "finalize_job_execution [" + jobIdString + "]";
logger.debug("finalizing jobs [{}]", jobIdString);
clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
XPackPlugin.checkReadyForXPackCustomMetadata(currentState);
MlMetadata mlMetadata = MlMetadata.getMlMetadata(currentState);
MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata);
Date finishedTime = new Date();

for (String jobId : request.getJobIds()) {
Job.Builder jobBuilder = new Job.Builder(mlMetadata.getJobs().get(jobId));
jobBuilder.setFinishedTime(finishedTime);
mlMetadataBuilder.putJob(jobBuilder.build(), true);
}
ClusterState.Builder builder = ClusterState.builder(currentState);
return builder.metaData(new MetaData.Builder(currentState.metaData())
.putCustom(MlMetadata.TYPE, mlMetadataBuilder.build()))
.build();
}

@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState,
ClusterState newState) {
logger.debug("finalized job [{}]", jobIdString);
listener.onResponse(new AcknowledgedResponse(true));
}
});
ActionListener<AcknowledgedResponse> listener) {
// This action is no longer required but needs to be preserved
// in case it is called by an old node in a mixed cluster
listener.onResponse(new AcknowledgedResponse(true));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ public void onTimeout(TimeValue timeout) {
}

private void clearJobFinishedTime(String jobId, ActionListener<AcknowledgedResponse> listener) {
JobUpdate update = new JobUpdate.Builder(jobId).setClearJobFinishTime(true).build();
JobUpdate update = new JobUpdate.Builder(jobId).setClearFinishTime(true).build();

jobConfigProvider.updateJob(jobId, update, null, ActionListener.wrap(
job -> listener.onResponse(new AcknowledgedResponse(true)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,9 @@ public void onFailure(Exception e) {
*
* @param jobId The Id of the job to update
* @param update The job update
* @param maxModelMemoryLimit The maximum model memory allowed
* @param maxModelMemoryLimit The maximum model memory allowed. This can be {@code null}
* if the job's {@link org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits}
* are not changed.
* @param updatedJobListener Updated job listener
*/
public void updateJob(String jobId, JobUpdate update, ByteSizeValue maxModelMemoryLimit, ActionListener<Job> updatedJobListener) {
Expand Down Expand Up @@ -373,7 +375,6 @@ private void indexUpdatedJob(Job updatedJob, long version, ActionListener<Job> u
}
}


/**
* Check a job exists. A job exists if it has a configuration document.
* If the .ml-config index does not exist it is treated as a missing job
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,10 @@ public void persistQuantiles(Quantiles quantiles, WriteRequest.RefreshPolicy ref
/**
* Persist a model snapshot description
*/
public void persistModelSnapshot(ModelSnapshot modelSnapshot, WriteRequest.RefreshPolicy refreshPolicy) {
public IndexResponse persistModelSnapshot(ModelSnapshot modelSnapshot, WriteRequest.RefreshPolicy refreshPolicy) {
Persistable persistable = new Persistable(modelSnapshot.getJobId(), modelSnapshot, ModelSnapshot.documentId(modelSnapshot));
persistable.setRefreshPolicy(refreshPolicy);
persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(modelSnapshot.getJobId())).actionGet();
return persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(modelSnapshot.getJobId())).actionGet();
}

/**
Expand Down
Loading