From 9230a48722533b1ac523d3f88927494b9acec0a0 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Fri, 7 Sep 2018 07:04:27 -0500 Subject: [PATCH] HLRC: ML Post Data (#33443) * HLRC: ML Post data --- .../client/MLRequestConverters.java | 35 +++ .../client/MachineLearningClient.java | 48 ++++ .../client/ml/PostDataRequest.java | 229 ++++++++++++++++++ .../client/ml/PostDataResponse.java | 74 ++++++ .../client/MLRequestConvertersTests.java | 32 +++ .../client/MachineLearningIT.java | 30 ++- .../MlClientDocumentationIT.java | 70 ++++++ .../client/ml/PostDataRequestTests.java | 90 +++++++ .../client/ml/PostDataResponseTests.java | 43 ++++ .../high-level/ml/post-data.asciidoc | 86 +++++++ .../high-level/supported-apis.asciidoc | 2 + 11 files changed, 736 insertions(+), 3 deletions(-) create mode 100644 client/rest-high-level/src/main/java/org/elasticsearch/client/ml/PostDataRequest.java create mode 100644 client/rest-high-level/src/main/java/org/elasticsearch/client/ml/PostDataResponse.java create mode 100644 client/rest-high-level/src/test/java/org/elasticsearch/client/ml/PostDataRequestTests.java create mode 100644 client/rest-high-level/src/test/java/org/elasticsearch/client/ml/PostDataResponseTests.java create mode 100644 docs/java-rest/high-level/ml/post-data.asciidoc diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java index b8d977d8eeb94..cbf653a713d39 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java @@ -19,10 +19,13 @@ package org.elasticsearch.client; +import org.apache.http.HttpEntity; import org.apache.http.client.methods.HttpDelete; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.lucene.util.BytesRef; import org.elasticsearch.client.RequestConverters.EndpointBuilder; import org.elasticsearch.client.ml.CloseJobRequest; import org.elasticsearch.client.ml.DeleteJobRequest; @@ -34,13 +37,16 @@ import org.elasticsearch.client.ml.GetOverallBucketsRequest; import org.elasticsearch.client.ml.GetRecordsRequest; import org.elasticsearch.client.ml.OpenJobRequest; +import org.elasticsearch.client.ml.PostDataRequest; import org.elasticsearch.client.ml.PutJobRequest; import org.elasticsearch.client.ml.UpdateJobRequest; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesReference; import java.io.IOException; import static org.elasticsearch.client.RequestConverters.REQUEST_BODY_CONTENT_TYPE; +import static org.elasticsearch.client.RequestConverters.createContentType; import static org.elasticsearch.client.RequestConverters.createEntity; final class MLRequestConverters { @@ -202,6 +208,35 @@ static Request getRecords(GetRecordsRequest getRecordsRequest) throws IOExceptio return request; } + static Request postData(PostDataRequest postDataRequest) throws IOException { + String endpoint = new EndpointBuilder() + .addPathPartAsIs("_xpack") + .addPathPartAsIs("ml") + .addPathPartAsIs("anomaly_detectors") + .addPathPart(postDataRequest.getJobId()) + .addPathPartAsIs("_data") + .build(); + Request request = new Request(HttpPost.METHOD_NAME, endpoint); + + RequestConverters.Params params = new RequestConverters.Params(request); + if (postDataRequest.getResetStart() != null) { + params.putParam(PostDataRequest.RESET_START.getPreferredName(), postDataRequest.getResetStart()); + } + if (postDataRequest.getResetEnd() != null) { + params.putParam(PostDataRequest.RESET_END.getPreferredName(), postDataRequest.getResetEnd()); + } + BytesReference content = postDataRequest.getContent(); + if (content != null) { + BytesRef source = postDataRequest.getContent().toBytesRef(); + HttpEntity byteEntity = new ByteArrayEntity(source.bytes, + source.offset, + source.length, + createContentType(postDataRequest.getXContentType())); + request.setEntity(byteEntity); + } + return request; + } + static Request getInfluencers(GetInfluencersRequest getInfluencersRequest) throws IOException { String endpoint = new EndpointBuilder() .addPathPartAsIs("_xpack") diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java index bdfc34ad997d6..6e54b9259865f 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java @@ -19,6 +19,8 @@ package org.elasticsearch.client; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.ml.PostDataRequest; +import org.elasticsearch.client.ml.PostDataResponse; import org.elasticsearch.client.ml.UpdateJobRequest; import org.elasticsearch.client.ml.CloseJobRequest; import org.elasticsearch.client.ml.CloseJobResponse; @@ -501,6 +503,52 @@ public void getRecordsAsync(GetRecordsRequest request, RequestOptions options, A Collections.emptySet()); } + /** + * Sends data to an anomaly detection job for analysis. + * + * NOTE: The job must have a state of open to receive and process the data. + * + *

+ * For additional info + * see ML POST Data documentation + *

+ * + * @param request PostDataRequest containing the data to post and some additional options + * @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @return response containing operational progress about the job + * @throws IOException when there is a serialization issue sending the request or receiving the response + */ + public PostDataResponse postData(PostDataRequest request, RequestOptions options) throws IOException { + return restHighLevelClient.performRequestAndParseEntity(request, + MLRequestConverters::postData, + options, + PostDataResponse::fromXContent, + Collections.emptySet()); + } + + /** + * Sends data to an anomaly detection job for analysis, asynchronously + * + * NOTE: The job must have a state of open to receive and process the data. + * + *

+ * For additional info + * see ML POST Data documentation + *

+ * + * @param request PostDataRequest containing the data to post and some additional options + * @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @param listener Listener to be notified upon request completion + */ + public void postDataAsync(PostDataRequest request, RequestOptions options, ActionListener listener) { + restHighLevelClient.performRequestAsyncAndParseEntity(request, + MLRequestConverters::postData, + options, + PostDataResponse::fromXContent, + listener, + Collections.emptySet()); + } + /** * Gets the influencers for a Machine Learning Job. *

diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/PostDataRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/PostDataRequest.java new file mode 100644 index 0000000000000..cc015fc4837e2 --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/PostDataRequest.java @@ -0,0 +1,229 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client.ml; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.client.ml.job.config.Job; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentType; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * POJO for posting data to a Machine Learning job + */ +public class PostDataRequest extends ActionRequest implements ToXContentObject { + + public static final ParseField RESET_START = new ParseField("reset_start"); + public static final ParseField RESET_END = new ParseField("reset_end"); + public static final ParseField CONTENT_TYPE = new ParseField("content_type"); + + public static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>("post_data_request", + (a) -> new PostDataRequest((String)a[0], XContentType.fromMediaTypeOrFormat((String)a[1]), new byte[0])); + + static { + PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID); + PARSER.declareString(ConstructingObjectParser.constructorArg(), CONTENT_TYPE); + PARSER.declareStringOrNull(PostDataRequest::setResetEnd, RESET_END); + PARSER.declareStringOrNull(PostDataRequest::setResetStart, RESET_START); + } + + private final String jobId; + private final XContentType xContentType; + private final BytesReference content; + private String resetStart; + private String resetEnd; + + /** + * Create a new PostDataRequest object + * + * @param jobId non-null jobId of the job to post data to + * @param xContentType content type of the data to post. Only {@link XContentType#JSON} or {@link XContentType#SMILE} are supported + * @param content bulk serialized content in the format of the passed {@link XContentType} + */ + public PostDataRequest(String jobId, XContentType xContentType, BytesReference content) { + this.jobId = Objects.requireNonNull(jobId, "job_id must not be null"); + this.xContentType = Objects.requireNonNull(xContentType, "content_type must not be null"); + this.content = Objects.requireNonNull(content, "content must not be null"); + } + + /** + * Create a new PostDataRequest object referencing the passed {@code byte[]} content + * + * @param jobId non-null jobId of the job to post data to + * @param xContentType content type of the data to post. Only {@link XContentType#JSON} or {@link XContentType#SMILE} are supported + * @param content bulk serialized content in the format of the passed {@link XContentType} + */ + public PostDataRequest(String jobId, XContentType xContentType, byte[] content) { + this(jobId, xContentType, new BytesArray(content)); + } + + /** + * Create a new PostDataRequest object referencing the passed {@link JsonBuilder} object + * + * @param jobId non-null jobId of the job to post data to + * @param builder {@link JsonBuilder} object containing documents to be serialized and sent in {@link XContentType#JSON} format + */ + public PostDataRequest(String jobId, JsonBuilder builder) { + this(jobId, XContentType.JSON, builder.build()); + } + + public String getJobId() { + return jobId; + } + + public String getResetStart() { + return resetStart; + } + + /** + * Specifies the start of the bucket resetting range + * + * @param resetStart String representation of a timestamp; may be an epoch seconds, epoch millis or an ISO 8601 string + */ + public void setResetStart(String resetStart) { + this.resetStart = resetStart; + } + + public String getResetEnd() { + return resetEnd; + } + + /** + * Specifies the end of the bucket resetting range + * + * @param resetEnd String representation of a timestamp; may be an epoch seconds, epoch millis or an ISO 8601 string + */ + public void setResetEnd(String resetEnd) { + this.resetEnd = resetEnd; + } + + public BytesReference getContent() { + return content; + } + + public XContentType getXContentType() { + return xContentType; + } + + @Override + public int hashCode() { + //We leave out the content for server side parity + return Objects.hash(jobId, resetStart, resetEnd, xContentType); + } + + @Override + public boolean equals(Object obj) { + if(obj == this) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + //We leave out the content for server side parity + PostDataRequest other = (PostDataRequest) obj; + return Objects.equals(jobId, other.jobId) && + Objects.equals(resetStart, other.resetStart) && + Objects.equals(resetEnd, other.resetEnd) && + Objects.equals(xContentType, other.xContentType); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(Job.ID.getPreferredName(), jobId); + builder.field(CONTENT_TYPE.getPreferredName(), xContentType.mediaType()); + if (resetEnd != null) { + builder.field(RESET_END.getPreferredName(), resetEnd); + } + if (resetStart != null) { + builder.field(RESET_START.getPreferredName(), resetStart); + } + builder.endObject(); + return builder; + } + + /** + * Class for incrementally building a bulk document request in {@link XContentType#JSON} format + */ + public static class JsonBuilder { + + private final List bytes = new ArrayList<>(); + + /** + * Add a document via a {@code byte[]} array + * + * @param doc {@code byte[]} array of a serialized JSON object + */ + public JsonBuilder addDoc(byte[] doc) { + bytes.add(ByteBuffer.wrap(doc)); + return this; + } + + /** + * Add a document via a serialized JSON String + * + * @param doc a serialized JSON String + */ + public JsonBuilder addDoc(String doc) { + bytes.add(ByteBuffer.wrap(doc.getBytes(StandardCharsets.UTF_8))); + return this; + } + + /** + * Add a document via an object map + * + * @param doc document object to add to bulk request + * @throws IOException on parsing/serialization errors + */ + public JsonBuilder addDoc(Map doc) throws IOException { + try (XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent())) { + builder.map(doc); + bytes.add(ByteBuffer.wrap(BytesReference.toBytes(BytesReference.bytes(builder)))); + } + return this; + } + + private BytesReference build() { + ByteBuffer[] buffers = bytes.toArray(new ByteBuffer[bytes.size()]); + return BytesReference.fromByteBuffers(buffers); + } + + } +} diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/PostDataResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/PostDataResponse.java new file mode 100644 index 0000000000000..ce99316e90c76 --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/PostDataResponse.java @@ -0,0 +1,74 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client.ml; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.client.ml.job.process.DataCounts; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Objects; + +/** + * Response object when posting data to a Machine Learning Job + */ +public class PostDataResponse extends ActionResponse implements ToXContentObject { + + private DataCounts dataCounts; + + public static PostDataResponse fromXContent(XContentParser parser) throws IOException { + return new PostDataResponse(DataCounts.PARSER.parse(parser, null)); + } + + public PostDataResponse(DataCounts counts) { + this.dataCounts = counts; + } + + public DataCounts getDataCounts() { + return dataCounts; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + return dataCounts.toXContent(builder, params); + } + + @Override + public int hashCode() { + return Objects.hashCode(dataCounts); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + PostDataResponse other = (PostDataResponse) obj; + return Objects.equals(dataCounts, other.dataCounts); + } + +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java index f1b035566aa4d..e0f20e2f23c83 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java @@ -33,6 +33,7 @@ import org.elasticsearch.client.ml.GetOverallBucketsRequest; import org.elasticsearch.client.ml.GetRecordsRequest; import org.elasticsearch.client.ml.OpenJobRequest; +import org.elasticsearch.client.ml.PostDataRequest; import org.elasticsearch.client.ml.PutJobRequest; import org.elasticsearch.client.ml.UpdateJobRequest; import org.elasticsearch.client.ml.job.config.AnalysisConfig; @@ -43,12 +44,15 @@ import org.elasticsearch.client.ml.job.util.PageParams; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.test.ESTestCase; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import static org.hamcrest.Matchers.equalTo; @@ -238,6 +242,34 @@ public void testGetRecords() throws IOException { } } + public void testPostData() throws Exception { + String jobId = randomAlphaOfLength(10); + PostDataRequest.JsonBuilder jsonBuilder = new PostDataRequest.JsonBuilder(); + Map obj = new HashMap<>(); + obj.put("foo", "bar"); + jsonBuilder.addDoc(obj); + + PostDataRequest postDataRequest = new PostDataRequest(jobId, jsonBuilder); + Request request = MLRequestConverters.postData(postDataRequest); + + assertEquals(HttpPost.METHOD_NAME, request.getMethod()); + assertEquals("/_xpack/ml/anomaly_detectors/" + jobId + "/_data", request.getEndpoint()); + assertEquals("{\"foo\":\"bar\"}", requestEntityToString(request)); + assertEquals(postDataRequest.getXContentType().mediaTypeWithoutParameters(), request.getEntity().getContentType().getValue()); + assertFalse(request.getParameters().containsKey(PostDataRequest.RESET_END.getPreferredName())); + assertFalse(request.getParameters().containsKey(PostDataRequest.RESET_START.getPreferredName())); + + PostDataRequest postDataRequest2 = new PostDataRequest(jobId, XContentType.SMILE, new byte[0]); + postDataRequest2.setResetStart("2018-08-08T00:00:00Z"); + postDataRequest2.setResetEnd("2018-09-08T00:00:00Z"); + + request = MLRequestConverters.postData(postDataRequest2); + + assertEquals(postDataRequest2.getXContentType().mediaTypeWithoutParameters(), request.getEntity().getContentType().getValue()); + assertEquals("2018-09-08T00:00:00Z", request.getParameters().get(PostDataRequest.RESET_END.getPreferredName())); + assertEquals("2018-08-08T00:00:00Z", request.getParameters().get(PostDataRequest.RESET_START.getPreferredName())); + } + public void testGetInfluencers() throws IOException { String jobId = randomAlphaOfLength(10); GetInfluencersRequest getInfluencersRequest = new GetInfluencersRequest(jobId); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java index bf25d9d1c0fb3..93019ba0d43e0 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java @@ -20,6 +20,8 @@ import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.client.ml.PostDataRequest; +import org.elasticsearch.client.ml.PostDataResponse; import org.elasticsearch.client.ml.UpdateJobRequest; import org.elasticsearch.client.ml.job.config.JobUpdate; import org.elasticsearch.common.unit.TimeValue; @@ -41,13 +43,14 @@ import org.elasticsearch.client.ml.job.config.DataDescription; import org.elasticsearch.client.ml.job.config.Detector; import org.elasticsearch.client.ml.job.config.Job; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.client.ml.FlushJobRequest; import org.elasticsearch.client.ml.FlushJobResponse; import org.junit.After; import java.io.IOException; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -220,6 +223,27 @@ public void testGetJobStats() throws Exception { assertThat(exception.status().getStatus(), equalTo(404)); } + public void testPostData() throws Exception { + String jobId = randomValidJobId(); + Job job = buildJob(jobId); + MachineLearningClient machineLearningClient = highLevelClient().machineLearning(); + machineLearningClient.putJob(new PutJobRequest(job), RequestOptions.DEFAULT); + machineLearningClient.openJob(new OpenJobRequest(jobId), RequestOptions.DEFAULT); + + PostDataRequest.JsonBuilder builder = new PostDataRequest.JsonBuilder(); + for(int i = 0; i < 10; i++) { + Map hashMap = new HashMap<>(); + hashMap.put("total", randomInt(1000)); + hashMap.put("timestamp", (i+1)*1000); + builder.addDoc(hashMap); + } + PostDataRequest postDataRequest = new PostDataRequest(jobId, builder); + + PostDataResponse response = execute(postDataRequest, machineLearningClient::postData, machineLearningClient::postDataAsync); + assertEquals(10, response.getDataCounts().getInputRecordCount()); + assertEquals(0, response.getDataCounts().getOutOfOrderTimeStampCount()); + } + public void testUpdateJob() throws Exception { String jobId = randomValidJobId(); Job job = buildJob(jobId); @@ -256,8 +280,8 @@ public static Job buildJob(String jobId) { builder.setAnalysisConfig(configBuilder); DataDescription.Builder dataDescription = new DataDescription.Builder(); - dataDescription.setTimeFormat(randomFrom(DataDescription.EPOCH_MS, DataDescription.EPOCH)); - dataDescription.setTimeField(randomAlphaOfLength(10)); + dataDescription.setTimeFormat(DataDescription.EPOCH_MS); + dataDescription.setTimeField("timestamp"); builder.setDataDescription(dataDescription); return builder.build(); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java index ac7835735fcf1..bc452ad8503f7 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java @@ -49,6 +49,8 @@ import org.elasticsearch.client.ml.GetRecordsResponse; import org.elasticsearch.client.ml.OpenJobRequest; import org.elasticsearch.client.ml.OpenJobResponse; +import org.elasticsearch.client.ml.PostDataRequest; +import org.elasticsearch.client.ml.PostDataResponse; import org.elasticsearch.client.ml.PutJobRequest; import org.elasticsearch.client.ml.PutJobResponse; import org.elasticsearch.client.ml.UpdateJobRequest; @@ -58,6 +60,7 @@ import org.elasticsearch.client.ml.job.config.DetectionRule; import org.elasticsearch.client.ml.job.config.Detector; import org.elasticsearch.client.ml.job.config.Job; +import org.elasticsearch.client.ml.job.process.DataCounts; import org.elasticsearch.client.ml.job.config.JobUpdate; import org.elasticsearch.client.ml.job.config.ModelPlotConfig; import org.elasticsearch.client.ml.job.config.Operator; @@ -882,6 +885,73 @@ public void onFailure(Exception e) { } } + public void testPostData() throws Exception { + RestHighLevelClient client = highLevelClient(); + + Job job = MachineLearningIT.buildJob("test-post-data"); + client.machineLearning().putJob(new PutJobRequest(job), RequestOptions.DEFAULT); + client.machineLearning().openJob(new OpenJobRequest(job.getId()), RequestOptions.DEFAULT); + + { + //tag::x-pack-ml-post-data-request + PostDataRequest.JsonBuilder jsonBuilder = new PostDataRequest.JsonBuilder(); //<1> + Map mapData = new HashMap<>(); + mapData.put("total", 109); + jsonBuilder.addDoc(mapData); //<2> + jsonBuilder.addDoc("{\"total\":1000}"); //<3> + PostDataRequest postDataRequest = new PostDataRequest("test-post-data", jsonBuilder); //<4> + //end::x-pack-ml-post-data-request + + + //tag::x-pack-ml-post-data-request-options + postDataRequest.setResetStart("2018-08-31T16:35:07+00:00"); //<1> + postDataRequest.setResetEnd("2018-08-31T16:35:17+00:00"); //<2> + //end::x-pack-ml-post-data-request-options + postDataRequest.setResetEnd(null); + postDataRequest.setResetStart(null); + + //tag::x-pack-ml-post-data-execute + PostDataResponse postDataResponse = client.machineLearning().postData(postDataRequest, RequestOptions.DEFAULT); + //end::x-pack-ml-post-data-execute + + //tag::x-pack-ml-post-data-response + DataCounts dataCounts = postDataResponse.getDataCounts(); //<1> + //end::x-pack-ml-post-data-response + assertEquals(2, dataCounts.getInputRecordCount()); + + } + { + //tag::x-pack-ml-post-data-listener + ActionListener listener = new ActionListener() { + @Override + public void onResponse(PostDataResponse postDataResponse) { + //<1> + } + + @Override + public void onFailure(Exception e) { + // <2> + } + }; + //end::x-pack-ml-post-data-listener + PostDataRequest.JsonBuilder jsonBuilder = new PostDataRequest.JsonBuilder(); + Map mapData = new HashMap<>(); + mapData.put("total", 109); + jsonBuilder.addDoc(mapData); + PostDataRequest postDataRequest = new PostDataRequest("test-post-data", jsonBuilder); //<1> + + // Replace the empty listener by a blocking listener in test + final CountDownLatch latch = new CountDownLatch(1); + listener = new LatchedActionListener<>(listener, latch); + + // tag::x-pack-ml-post-data-execute-async + client.machineLearning().postDataAsync(postDataRequest, RequestOptions.DEFAULT, listener); //<1> + // end::x-pack-ml-post-data-execute-async + + assertTrue(latch.await(30L, TimeUnit.SECONDS)); + } + } + public void testGetInfluencers() throws IOException, InterruptedException { RestHighLevelClient client = highLevelClient(); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/PostDataRequestTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/PostDataRequestTests.java new file mode 100644 index 0000000000000..363d37c3ca4a0 --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/PostDataRequestTests.java @@ -0,0 +1,90 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client.ml; + +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.test.AbstractXContentTestCase; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + + +public class PostDataRequestTests extends AbstractXContentTestCase { + + @Override + protected PostDataRequest createTestInstance() { + String jobId = randomAlphaOfLength(10); + XContentType contentType = randomFrom(XContentType.JSON, XContentType.SMILE); + + PostDataRequest request = new PostDataRequest(jobId, contentType, new byte[0]); + if (randomBoolean()) { + request.setResetEnd(randomAlphaOfLength(10)); + } + if (randomBoolean()) { + request.setResetStart(randomAlphaOfLength(10)); + } + + return request; + } + + @Override + protected PostDataRequest doParseInstance(XContentParser parser) { + return PostDataRequest.PARSER.apply(parser, null); + } + + @Override + protected boolean supportsUnknownFields() { + return false; + } + + public void testJsonBuilder() throws IOException { + + String jobId = randomAlphaOfLength(10); + PostDataRequest.JsonBuilder builder = new PostDataRequest.JsonBuilder(); + + Map obj1 = new HashMap<>(); + obj1.put("entry1", "value1"); + obj1.put("entry2", "value2"); + builder.addDoc(obj1); + + builder.addDoc("{\"entry3\":\"value3\"}"); + builder.addDoc("{\"entry4\":\"value4\"}".getBytes(StandardCharsets.UTF_8)); + + PostDataRequest request = new PostDataRequest(jobId, builder); + + assertEquals("{\"entry1\":\"value1\",\"entry2\":\"value2\"}{\"entry3\":\"value3\"}{\"entry4\":\"value4\"}", + request.getContent().utf8ToString()); + assertEquals(XContentType.JSON, request.getXContentType()); + assertEquals(jobId, request.getJobId()); + } + + public void testFromByteArray() { + String jobId = randomAlphaOfLength(10); + PostDataRequest request = new PostDataRequest(jobId, + XContentType.JSON, + "{\"others\":{\"foo\":100}}".getBytes(StandardCharsets.UTF_8)); + + assertEquals("{\"others\":{\"foo\":100}}", request.getContent().utf8ToString()); + assertEquals(XContentType.JSON, request.getXContentType()); + assertEquals(jobId, request.getJobId()); + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/PostDataResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/PostDataResponseTests.java new file mode 100644 index 0000000000000..fc74040cc407c --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/PostDataResponseTests.java @@ -0,0 +1,43 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client.ml; + +import org.elasticsearch.client.ml.job.process.DataCountsTests; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractXContentTestCase; + +import java.io.IOException; + +public class PostDataResponseTests extends AbstractXContentTestCase { + + @Override + protected PostDataResponse createTestInstance() { + return new PostDataResponse(DataCountsTests.createTestInstance(randomAlphaOfLength(10))); + } + + @Override + protected PostDataResponse doParseInstance(XContentParser parser) throws IOException { + return PostDataResponse.fromXContent(parser); + } + + @Override + protected boolean supportsUnknownFields() { + return true; + } +} diff --git a/docs/java-rest/high-level/ml/post-data.asciidoc b/docs/java-rest/high-level/ml/post-data.asciidoc new file mode 100644 index 0000000000000..2c8ca8f18a384 --- /dev/null +++ b/docs/java-rest/high-level/ml/post-data.asciidoc @@ -0,0 +1,86 @@ +[[java-rest-high-x-pack-ml-post-data]] +=== Post Data API + +The Post Data API provides the ability to post data to an open + {ml} job in the cluster. +It accepts a `PostDataRequest` object and responds +with a `PostDataResponse` object. + +[[java-rest-high-x-pack-ml-post-data-request]] +==== Post Data Request + +A `PostDataRequest` object gets created with an existing non-null `jobId` +and the `XContentType` being sent. Individual docs can be added +incrementally via the `PostDataRequest.JsonBuilder#addDoc` method. +These are then serialized and sent in bulk when passed to the `PostDataRequest`. + +Alternatively, the serialized bulk content can be set manually, along with its `XContentType` +through one of the other `PostDataRequest` constructors. + +Only `XContentType.JSON` and `XContentType.SMILE` are supported. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-post-data-request] +-------------------------------------------------- +<1> Create a new `PostDataRequest.JsonBuilder` object for incrementally adding documents +<2> Add a new document as a `Map` object +<3> Add a new document as a serialized JSON formatted String. +<4> Constructing a new request referencing an opened `jobId`, and a JsonBuilder + +==== Optional Arguments + +The following arguments are optional. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-post-data-request-options] +-------------------------------------------------- +<1> Set the start of the bucket resetting time +<2> Set the end of the bucket resetting time + +[[java-rest-high-x-pack-ml-post-data-execution]] +==== Execution + +The request can be executed through the `MachineLearningClient` contained +in the `RestHighLevelClient` object, accessed via the `machineLearningClient()` method. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-post-data-execute] +-------------------------------------------------- + +[[java-rest-high-x-pack-ml-post-data-execution-async]] +==== Asynchronous Execution + +The request can also be executed asynchronously: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-post-data-execute-async] +-------------------------------------------------- +<1> The `PostDataRequest` to execute and the `ActionListener` to use when +the execution completes + +The method does not block and returns immediately. The passed `ActionListener` is used +to notify the caller of completion. A typical `ActionListener` for `PostDataResponse` may +look like + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-post-data-listener] +-------------------------------------------------- +<1> `onResponse` is called back when the action is completed successfully +<2> `onFailure` is called back when some unexpected error occurs + +[[java-rest-high-x-pack-ml-post-data-response]] +==== Post Data Response + +A `PostDataResponse` contains current data processing statistics. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-ml-post-data-response] +-------------------------------------------------- +<1> `getDataCounts()` a `DataCounts` object containing the current +data processing counts. diff --git a/docs/java-rest/high-level/supported-apis.asciidoc b/docs/java-rest/high-level/supported-apis.asciidoc index c482c8bccff23..e1335b0effc5d 100644 --- a/docs/java-rest/high-level/supported-apis.asciidoc +++ b/docs/java-rest/high-level/supported-apis.asciidoc @@ -221,6 +221,7 @@ The Java High Level REST Client supports the following Machine Learning APIs: * <> * <> * <> +* <> * <> include::ml/put-job.asciidoc[] @@ -234,6 +235,7 @@ include::ml/get-job-stats.asciidoc[] include::ml/get-buckets.asciidoc[] include::ml/get-overall-buckets.asciidoc[] include::ml/get-records.asciidoc[] +include::ml/post-data.asciidoc[] include::ml/get-influencers.asciidoc[] == Migration APIs