Skip to content

Commit

Permalink
HLRC: ML Add preview datafeed api (#34284)
Browse files Browse the repository at this point in the history
* HLRC: ML Add preview datafeed api

* Changing deprecation handling for parser

* Removing some duplication in docs, will address other APIs in another PR
  • Loading branch information
benwtrent committed Oct 4, 2018
1 parent 1f74c53 commit 6c00b91
Show file tree
Hide file tree
Showing 11 changed files with 600 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.elasticsearch.client.ml.GetRecordsRequest;
import org.elasticsearch.client.ml.OpenJobRequest;
import org.elasticsearch.client.ml.PostDataRequest;
import org.elasticsearch.client.ml.PreviewDatafeedRequest;
import org.elasticsearch.client.ml.PutCalendarRequest;
import org.elasticsearch.client.ml.PutDatafeedRequest;
import org.elasticsearch.client.ml.PutJobRequest;
Expand Down Expand Up @@ -259,6 +260,17 @@ static Request stopDatafeed(StopDatafeedRequest stopDatafeedRequest) throws IOEx
return request;
}

static Request previewDatafeed(PreviewDatafeedRequest previewDatafeedRequest) {
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_xpack")
.addPathPartAsIs("ml")
.addPathPartAsIs("datafeeds")
.addPathPart(previewDatafeedRequest.getDatafeedId())
.addPathPartAsIs("_preview")
.build();
return new Request(HttpGet.METHOD_NAME, endpoint);
}

static Request deleteForecast(DeleteForecastRequest deleteForecastRequest) {
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_xpack")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import org.elasticsearch.client.ml.OpenJobResponse;
import org.elasticsearch.client.ml.PostDataRequest;
import org.elasticsearch.client.ml.PostDataResponse;
import org.elasticsearch.client.ml.PreviewDatafeedRequest;
import org.elasticsearch.client.ml.PreviewDatafeedResponse;
import org.elasticsearch.client.ml.PutCalendarRequest;
import org.elasticsearch.client.ml.PutCalendarResponse;
import org.elasticsearch.client.ml.PutDatafeedRequest;
Expand Down Expand Up @@ -649,6 +651,49 @@ public void stopDatafeedAsync(StopDatafeedRequest request, RequestOptions option
Collections.emptySet());
}

/**
* Previews the given Machine Learning Datafeed
* <p>
* For additional info
* see <a href="http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-preview-datafeed.html">
* ML Preview Datafeed documentation</a>
*
* @param request The request to preview the datafeed
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return {@link PreviewDatafeedResponse} object containing a {@link org.elasticsearch.common.bytes.BytesReference} of the data in
* JSON format
* @throws IOException when there is a serialization issue sending the request or receiving the response
*/
public PreviewDatafeedResponse previewDatafeed(PreviewDatafeedRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request,
MLRequestConverters::previewDatafeed,
options,
PreviewDatafeedResponse::fromXContent,
Collections.emptySet());
}

/**
* Previews the given Machine Learning Datafeed asynchronously and notifies the listener on completion
* <p>
* For additional info
* see <a href="http://www.elastic.co/guide/en/elasticsearch/reference/current/ml-preview-datafeed.html">
* ML Preview Datafeed documentation</a>
*
* @param request The request to preview the datafeed
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener Listener to be notified upon request completion
*/
public void previewDatafeedAsync(PreviewDatafeedRequest request,
RequestOptions options,
ActionListener<PreviewDatafeedResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(request,
MLRequestConverters::previewDatafeed,
options,
PreviewDatafeedResponse::fromXContent,
listener,
Collections.emptySet());
}

/**
* Updates a Machine Learning {@link org.elasticsearch.client.ml.job.config.Job}
* <p>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.ml;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.client.ml.datafeed.DatafeedConfig;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.Objects;

/**
* Request to preview a MachineLearning Datafeed
*/
public class PreviewDatafeedRequest extends ActionRequest implements ToXContentObject {

public static final ConstructingObjectParser<PreviewDatafeedRequest, Void> PARSER = new ConstructingObjectParser<>(
"open_datafeed_request", true, a -> new PreviewDatafeedRequest((String) a[0]));

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), DatafeedConfig.ID);
}

public static PreviewDatafeedRequest fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}

private final String datafeedId;

/**
* Create a new request with the desired datafeedId
*
* @param datafeedId unique datafeedId, must not be null
*/
public PreviewDatafeedRequest(String datafeedId) {
this.datafeedId = Objects.requireNonNull(datafeedId, "[datafeed_id] must not be null");
}

public String getDatafeedId() {
return datafeedId;
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(DatafeedConfig.ID.getPreferredName(), datafeedId);
builder.endObject();
return builder;
}

@Override
public String toString() {
return Strings.toString(this);
}

@Override
public int hashCode() {
return Objects.hash(datafeedId);
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}

if (other == null || getClass() != other.getClass()) {
return false;
}

PreviewDatafeedRequest that = (PreviewDatafeedRequest) other;
return Objects.equals(datafeedId, that.datafeedId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.ml;

import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;

import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

/**
* Response containing a datafeed preview in JSON format
*/
public class PreviewDatafeedResponse extends ActionResponse implements ToXContentObject {

private BytesReference preview;

public static PreviewDatafeedResponse fromXContent(XContentParser parser) throws IOException {
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
parser.nextToken();
builder.copyCurrentStructure(parser);
return new PreviewDatafeedResponse(BytesReference.bytes(builder));
}
}

public PreviewDatafeedResponse(BytesReference preview) {
this.preview = preview;
}

public BytesReference getPreview() {
return preview;
}

/**
* Parses the preview to a list of {@link Map} objects
* @return List of previewed data
* @throws IOException If there is a parsing issue with the {@link BytesReference}
* @throws java.lang.ClassCastException If casting the raw {@link Object} entries to a {@link Map} fails
*/
@SuppressWarnings("unchecked")
public List<Map<String, Object>> getDataList() throws IOException {
try(StreamInput streamInput = preview.streamInput();
XContentParser parser = XContentType.JSON.xContent()
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, streamInput)) {
XContentParser.Token token = parser.nextToken();
if (token == XContentParser.Token.START_ARRAY) {
return parser.listOrderedMap().stream().map(obj -> (Map<String, Object>)obj).collect(Collectors.toList());
} else {
return Collections.singletonList(parser.mapOrdered());
}
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
try (InputStream stream = preview.streamInput()) {
builder.rawValue(stream, XContentType.JSON);
}
return builder;
}

@Override
public int hashCode() {
return Objects.hash(preview);
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
PreviewDatafeedResponse other = (PreviewDatafeedResponse) obj;
return Objects.equals(preview, other.preview);
}

@Override
public final String toString() {
return Strings.toString(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.client.ml.GetRecordsRequest;
import org.elasticsearch.client.ml.OpenJobRequest;
import org.elasticsearch.client.ml.PostDataRequest;
import org.elasticsearch.client.ml.PreviewDatafeedRequest;
import org.elasticsearch.client.ml.PutCalendarRequest;
import org.elasticsearch.client.ml.PutDatafeedRequest;
import org.elasticsearch.client.ml.PutJobRequest;
Expand Down Expand Up @@ -293,6 +294,13 @@ public void testStopDatafeed() throws Exception {
}
}

public void testPreviewDatafeed() {
PreviewDatafeedRequest datafeedRequest = new PreviewDatafeedRequest("datafeed_1");
Request request = MLRequestConverters.previewDatafeed(datafeedRequest);
assertEquals(HttpGet.METHOD_NAME, request.getMethod());
assertEquals("/_xpack/ml/datafeeds/" + datafeedRequest.getDatafeedId() + "/_preview", request.getEndpoint());
}

public void testDeleteForecast() {
String jobId = randomAlphaOfLength(10);
DeleteForecastRequest deleteForecastRequest = new DeleteForecastRequest(jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import org.elasticsearch.client.ml.OpenJobResponse;
import org.elasticsearch.client.ml.PostDataRequest;
import org.elasticsearch.client.ml.PostDataResponse;
import org.elasticsearch.client.ml.PreviewDatafeedRequest;
import org.elasticsearch.client.ml.PreviewDatafeedResponse;
import org.elasticsearch.client.ml.PutCalendarRequest;
import org.elasticsearch.client.ml.PutCalendarResponse;
import org.elasticsearch.client.ml.PutDatafeedRequest;
Expand Down Expand Up @@ -76,8 +78,11 @@
import org.junit.After;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -564,6 +569,56 @@ public void testStopDatafeed() throws Exception {
}
}

public void testPreviewDatafeed() throws Exception {
String jobId = "test-preview-datafeed";
String indexName = "preview_data_1";

// Set up the index and docs
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
createIndexRequest.mapping("doc", "timestamp", "type=date", "total", "type=long");
highLevelClient().indices().create(createIndexRequest, RequestOptions.DEFAULT);
BulkRequest bulk = new BulkRequest();
bulk.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
long now = (System.currentTimeMillis()/1000)*1000;
long thePast = now - 60000;
int i = 0;
List<Integer> totalTotals = new ArrayList<>(60);
while(thePast < now) {
Integer total = randomInt(1000);
IndexRequest doc = new IndexRequest();
doc.index(indexName);
doc.type("doc");
doc.id("id" + i);
doc.source("{\"total\":" + total + ",\"timestamp\":"+ thePast +"}", XContentType.JSON);
bulk.add(doc);
thePast += 1000;
i++;
totalTotals.add(total);
}
highLevelClient().bulk(bulk, RequestOptions.DEFAULT);

MachineLearningClient machineLearningClient = highLevelClient().machineLearning();
// create the job and the datafeed
Job job = buildJob(jobId);
putJob(job);
openJob(job);

String datafeedId = jobId + "-feed";
DatafeedConfig datafeed = DatafeedConfig.builder(datafeedId, jobId)
.setIndices(indexName)
.setQueryDelay(TimeValue.timeValueSeconds(1))
.setTypes(Collections.singletonList("doc"))
.setFrequency(TimeValue.timeValueSeconds(1)).build();
machineLearningClient.putDatafeed(new PutDatafeedRequest(datafeed), RequestOptions.DEFAULT);

PreviewDatafeedResponse response = execute(new PreviewDatafeedRequest(datafeedId),
machineLearningClient::previewDatafeed,
machineLearningClient::previewDatafeedAsync);

Integer[] totals = response.getDataList().stream().map(map -> (Integer)map.get("total")).toArray(Integer[]::new);
assertThat(totalTotals, containsInAnyOrder(totals));
}

public void testDeleteForecast() throws Exception {
String jobId = "test-delete-forecast";

Expand Down
Loading

0 comments on commit 6c00b91

Please sign in to comment.