Skip to content

Commit

Permalink
HLRC: ML Post Data (#33443)
Browse files Browse the repository at this point in the history
* HLRC: ML Post data
  • Loading branch information
benwtrent authored Sep 7, 2018
1 parent c12d232 commit 9230a48
Show file tree
Hide file tree
Showing 11 changed files with 736 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@

package org.elasticsearch.client;

import org.apache.http.HttpEntity;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.client.RequestConverters.EndpointBuilder;
import org.elasticsearch.client.ml.CloseJobRequest;
import org.elasticsearch.client.ml.DeleteJobRequest;
Expand All @@ -34,13 +37,16 @@
import org.elasticsearch.client.ml.GetOverallBucketsRequest;
import org.elasticsearch.client.ml.GetRecordsRequest;
import org.elasticsearch.client.ml.OpenJobRequest;
import org.elasticsearch.client.ml.PostDataRequest;
import org.elasticsearch.client.ml.PutJobRequest;
import org.elasticsearch.client.ml.UpdateJobRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;

import java.io.IOException;

import static org.elasticsearch.client.RequestConverters.REQUEST_BODY_CONTENT_TYPE;
import static org.elasticsearch.client.RequestConverters.createContentType;
import static org.elasticsearch.client.RequestConverters.createEntity;

final class MLRequestConverters {
Expand Down Expand Up @@ -202,6 +208,35 @@ static Request getRecords(GetRecordsRequest getRecordsRequest) throws IOExceptio
return request;
}

static Request postData(PostDataRequest postDataRequest) throws IOException {
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_xpack")
.addPathPartAsIs("ml")
.addPathPartAsIs("anomaly_detectors")
.addPathPart(postDataRequest.getJobId())
.addPathPartAsIs("_data")
.build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);

RequestConverters.Params params = new RequestConverters.Params(request);
if (postDataRequest.getResetStart() != null) {
params.putParam(PostDataRequest.RESET_START.getPreferredName(), postDataRequest.getResetStart());
}
if (postDataRequest.getResetEnd() != null) {
params.putParam(PostDataRequest.RESET_END.getPreferredName(), postDataRequest.getResetEnd());
}
BytesReference content = postDataRequest.getContent();
if (content != null) {
BytesRef source = postDataRequest.getContent().toBytesRef();
HttpEntity byteEntity = new ByteArrayEntity(source.bytes,
source.offset,
source.length,
createContentType(postDataRequest.getXContentType()));
request.setEntity(byteEntity);
}
return request;
}

static Request getInfluencers(GetInfluencersRequest getInfluencersRequest) throws IOException {
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_xpack")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.elasticsearch.client;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.ml.PostDataRequest;
import org.elasticsearch.client.ml.PostDataResponse;
import org.elasticsearch.client.ml.UpdateJobRequest;
import org.elasticsearch.client.ml.CloseJobRequest;
import org.elasticsearch.client.ml.CloseJobResponse;
Expand Down Expand Up @@ -501,6 +503,52 @@ public void getRecordsAsync(GetRecordsRequest request, RequestOptions options, A
Collections.emptySet());
}

/**
* Sends data to an anomaly detection job for analysis.
*
* NOTE: The job must have a state of open to receive and process the data.
*
* <p>
* For additional info
* see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-post-data.html">ML POST Data documentation</a>
* </p>
*
* @param request PostDataRequest containing the data to post and some additional options
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return response containing operational progress about the job
* @throws IOException when there is a serialization issue sending the request or receiving the response
*/
public PostDataResponse postData(PostDataRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request,
MLRequestConverters::postData,
options,
PostDataResponse::fromXContent,
Collections.emptySet());
}

/**
* Sends data to an anomaly detection job for analysis, asynchronously
*
* NOTE: The job must have a state of open to receive and process the data.
*
* <p>
* For additional info
* see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-post-data.html">ML POST Data documentation</a>
* </p>
*
* @param request PostDataRequest containing the data to post and some additional options
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener Listener to be notified upon request completion
*/
public void postDataAsync(PostDataRequest request, RequestOptions options, ActionListener<PostDataResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(request,
MLRequestConverters::postData,
options,
PostDataResponse::fromXContent,
listener,
Collections.emptySet());
}

/**
* Gets the influencers for a Machine Learning Job.
* <p>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.ml;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.client.ml.job.config.Job;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* POJO for posting data to a Machine Learning job
*/
public class PostDataRequest extends ActionRequest implements ToXContentObject {

public static final ParseField RESET_START = new ParseField("reset_start");
public static final ParseField RESET_END = new ParseField("reset_end");
public static final ParseField CONTENT_TYPE = new ParseField("content_type");

public static final ConstructingObjectParser<PostDataRequest, Void> PARSER =
new ConstructingObjectParser<>("post_data_request",
(a) -> new PostDataRequest((String)a[0], XContentType.fromMediaTypeOrFormat((String)a[1]), new byte[0]));

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID);
PARSER.declareString(ConstructingObjectParser.constructorArg(), CONTENT_TYPE);
PARSER.declareStringOrNull(PostDataRequest::setResetEnd, RESET_END);
PARSER.declareStringOrNull(PostDataRequest::setResetStart, RESET_START);
}

private final String jobId;
private final XContentType xContentType;
private final BytesReference content;
private String resetStart;
private String resetEnd;

/**
* Create a new PostDataRequest object
*
* @param jobId non-null jobId of the job to post data to
* @param xContentType content type of the data to post. Only {@link XContentType#JSON} or {@link XContentType#SMILE} are supported
* @param content bulk serialized content in the format of the passed {@link XContentType}
*/
public PostDataRequest(String jobId, XContentType xContentType, BytesReference content) {
this.jobId = Objects.requireNonNull(jobId, "job_id must not be null");
this.xContentType = Objects.requireNonNull(xContentType, "content_type must not be null");
this.content = Objects.requireNonNull(content, "content must not be null");
}

/**
* Create a new PostDataRequest object referencing the passed {@code byte[]} content
*
* @param jobId non-null jobId of the job to post data to
* @param xContentType content type of the data to post. Only {@link XContentType#JSON} or {@link XContentType#SMILE} are supported
* @param content bulk serialized content in the format of the passed {@link XContentType}
*/
public PostDataRequest(String jobId, XContentType xContentType, byte[] content) {
this(jobId, xContentType, new BytesArray(content));
}

/**
* Create a new PostDataRequest object referencing the passed {@link JsonBuilder} object
*
* @param jobId non-null jobId of the job to post data to
* @param builder {@link JsonBuilder} object containing documents to be serialized and sent in {@link XContentType#JSON} format
*/
public PostDataRequest(String jobId, JsonBuilder builder) {
this(jobId, XContentType.JSON, builder.build());
}

public String getJobId() {
return jobId;
}

public String getResetStart() {
return resetStart;
}

/**
* Specifies the start of the bucket resetting range
*
* @param resetStart String representation of a timestamp; may be an epoch seconds, epoch millis or an ISO 8601 string
*/
public void setResetStart(String resetStart) {
this.resetStart = resetStart;
}

public String getResetEnd() {
return resetEnd;
}

/**
* Specifies the end of the bucket resetting range
*
* @param resetEnd String representation of a timestamp; may be an epoch seconds, epoch millis or an ISO 8601 string
*/
public void setResetEnd(String resetEnd) {
this.resetEnd = resetEnd;
}

public BytesReference getContent() {
return content;
}

public XContentType getXContentType() {
return xContentType;
}

@Override
public int hashCode() {
//We leave out the content for server side parity
return Objects.hash(jobId, resetStart, resetEnd, xContentType);
}

@Override
public boolean equals(Object obj) {
if(obj == this) {
return true;
}

if (obj == null || getClass() != obj.getClass()) {
return false;
}

//We leave out the content for server side parity
PostDataRequest other = (PostDataRequest) obj;
return Objects.equals(jobId, other.jobId) &&
Objects.equals(resetStart, other.resetStart) &&
Objects.equals(resetEnd, other.resetEnd) &&
Objects.equals(xContentType, other.xContentType);
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Job.ID.getPreferredName(), jobId);
builder.field(CONTENT_TYPE.getPreferredName(), xContentType.mediaType());
if (resetEnd != null) {
builder.field(RESET_END.getPreferredName(), resetEnd);
}
if (resetStart != null) {
builder.field(RESET_START.getPreferredName(), resetStart);
}
builder.endObject();
return builder;
}

/**
* Class for incrementally building a bulk document request in {@link XContentType#JSON} format
*/
public static class JsonBuilder {

private final List<ByteBuffer> bytes = new ArrayList<>();

/**
* Add a document via a {@code byte[]} array
*
* @param doc {@code byte[]} array of a serialized JSON object
*/
public JsonBuilder addDoc(byte[] doc) {
bytes.add(ByteBuffer.wrap(doc));
return this;
}

/**
* Add a document via a serialized JSON String
*
* @param doc a serialized JSON String
*/
public JsonBuilder addDoc(String doc) {
bytes.add(ByteBuffer.wrap(doc.getBytes(StandardCharsets.UTF_8)));
return this;
}

/**
* Add a document via an object map
*
* @param doc document object to add to bulk request
* @throws IOException on parsing/serialization errors
*/
public JsonBuilder addDoc(Map<String, Object> doc) throws IOException {
try (XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent())) {
builder.map(doc);
bytes.add(ByteBuffer.wrap(BytesReference.toBytes(BytesReference.bytes(builder))));
}
return this;
}

private BytesReference build() {
ByteBuffer[] buffers = bytes.toArray(new ByteBuffer[bytes.size()]);
return BytesReference.fromByteBuffers(buffers);
}

}
}
Loading

0 comments on commit 9230a48

Please sign in to comment.