diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/XContentObjectTransformer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/XContentObjectTransformer.java index 5d25b9d71e618..9749f56566036 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/XContentObjectTransformer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/XContentObjectTransformer.java @@ -46,7 +46,8 @@ public class XContentObjectTransformer { public static XContentObjectTransformer aggregatorTransformer() { return new XContentObjectTransformer<>(searchRegistry, (p) -> { // Serializing a map creates an object, need to skip the start object for the aggregation parser - assert(XContentParser.Token.START_OBJECT.equals(p.nextToken())); + XContentParser.Token token = p.nextToken(); + assert(XContentParser.Token.START_OBJECT.equals(token)); return AggregatorFactories.parseAggregators(p); }); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java index b2743c7370976..e75aaa79253a8 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java @@ -8,28 +8,38 @@ import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.DeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParseException; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.script.Script; import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.avg.AvgAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipelineAggregationBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder.ScriptField; import org.elasticsearch.test.AbstractSerializingTestCase; @@ -83,7 +93,7 @@ public static DatafeedConfig createRandomizedDatafeedConfig(String jobId, long b if (randomBoolean() && addScriptFields == false) { // can only test with a single agg as the xcontent order gets randomized by test base class and then // the actual xcontent isn't the same and test fail. - // Testing with a single agg is ok as we don't have special list writeable / xconent logic + // Testing with a single agg is ok as we don't have special list writeable / xcontent logic AggregatorFactories.Builder aggs = new AggregatorFactories.Builder(); aggHistogramInterval = randomNonNegativeLong(); aggHistogramInterval = aggHistogramInterval> bucketSpanMillis ? bucketSpanMillis : aggHistogramInterval; @@ -567,6 +577,98 @@ public void testDefaultFrequency_GivenAggregationsWithHistogramInterval_1_Hour() assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueHours(12))); } + public void testSerializationOfComplexAggs() throws IOException { + MaxAggregationBuilder maxTime = AggregationBuilders.max("timestamp").field("timestamp"); + AvgAggregationBuilder avgAggregationBuilder = AggregationBuilders.avg("bytes_in_avg").field("system.network.in.bytes"); + DerivativePipelineAggregationBuilder derivativePipelineAggregationBuilder = + PipelineAggregatorBuilders.derivative("bytes_in_derivative", "bytes_in_avg"); + BucketScriptPipelineAggregationBuilder bucketScriptPipelineAggregationBuilder = + PipelineAggregatorBuilders.bucketScript("non_negative_bytes", + Collections.singletonMap("bytes", "bytes_in_derivative"), + new Script("params.bytes > 0 ? params.bytes : null")); + DateHistogramAggregationBuilder dateHistogram = + AggregationBuilders.dateHistogram("histogram_buckets") + .field("timestamp").interval(300000).timeZone(DateTimeZone.UTC) + .subAggregation(maxTime) + .subAggregation(avgAggregationBuilder) + .subAggregation(derivativePipelineAggregationBuilder) + .subAggregation(bucketScriptPipelineAggregationBuilder); + DatafeedConfig.Builder datafeedConfigBuilder = createDatafeedBuilderWithDateHistogram(dateHistogram); + QueryBuilder terms = + new BoolQueryBuilder().filter(new TermQueryBuilder(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10))); + datafeedConfigBuilder.setParsedQuery(terms); + DatafeedConfig datafeedConfig = datafeedConfigBuilder.build(); + AggregatorFactories.Builder aggBuilder = new AggregatorFactories.Builder().addAggregator(dateHistogram); + + + XContentType xContentType = XContentType.JSON; + BytesReference bytes = XContentHelper.toXContent(datafeedConfig, xContentType, false); + XContentParser parser = XContentHelper.createParser(xContentRegistry(), + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + bytes, + xContentType); + + DatafeedConfig parsedDatafeedConfig = doParseInstance(parser); + assertEquals(datafeedConfig, parsedDatafeedConfig); + + // Assert that the parsed versions of our aggs and queries work as well + assertEquals(aggBuilder, parsedDatafeedConfig.getParsedAggregations()); + assertEquals(terms, parsedDatafeedConfig.getParsedQuery()); + + try(BytesStreamOutput output = new BytesStreamOutput()) { + datafeedConfig.writeTo(output); + try(StreamInput streamInput = output.bytes().streamInput()) { + DatafeedConfig streamedDatafeedConfig = new DatafeedConfig(streamInput); + assertEquals(datafeedConfig, streamedDatafeedConfig); + + // Assert that the parsed versions of our aggs and queries work as well + assertEquals(aggBuilder, streamedDatafeedConfig.getParsedAggregations()); + assertEquals(terms, streamedDatafeedConfig.getParsedQuery()); + } + } + } + + public void testSerializationOfComplexAggsBetweenVersions() throws IOException { + MaxAggregationBuilder maxTime = AggregationBuilders.max("timestamp").field("timestamp"); + AvgAggregationBuilder avgAggregationBuilder = AggregationBuilders.avg("bytes_in_avg").field("system.network.in.bytes"); + DerivativePipelineAggregationBuilder derivativePipelineAggregationBuilder = + PipelineAggregatorBuilders.derivative("bytes_in_derivative", "bytes_in_avg"); + BucketScriptPipelineAggregationBuilder bucketScriptPipelineAggregationBuilder = + PipelineAggregatorBuilders.bucketScript("non_negative_bytes", + Collections.singletonMap("bytes", "bytes_in_derivative"), + new Script("params.bytes > 0 ? params.bytes : null")); + DateHistogramAggregationBuilder dateHistogram = + AggregationBuilders.dateHistogram("histogram_buckets") + .field("timestamp").interval(300000).timeZone(DateTimeZone.UTC) + .subAggregation(maxTime) + .subAggregation(avgAggregationBuilder) + .subAggregation(derivativePipelineAggregationBuilder) + .subAggregation(bucketScriptPipelineAggregationBuilder); + DatafeedConfig.Builder datafeedConfigBuilder = createDatafeedBuilderWithDateHistogram(dateHistogram); + QueryBuilder terms = + new BoolQueryBuilder().filter(new TermQueryBuilder(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10))); + datafeedConfigBuilder.setParsedQuery(terms); + DatafeedConfig datafeedConfig = datafeedConfigBuilder.build(); + + SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList()); + NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(searchModule.getNamedWriteables()); + + try (BytesStreamOutput output = new BytesStreamOutput()) { + output.setVersion(Version.V_6_0_0); + datafeedConfig.writeTo(output); + try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), namedWriteableRegistry)) { + in.setVersion(Version.V_6_0_0); + DatafeedConfig streamedDatafeedConfig = new DatafeedConfig(in); + assertEquals(datafeedConfig, streamedDatafeedConfig); + + // Assert that the parsed versions of our aggs and queries work as well + assertEquals(new AggregatorFactories.Builder().addAggregator(dateHistogram), + streamedDatafeedConfig.getParsedAggregations()); + assertEquals(terms, streamedDatafeedConfig.getParsedQuery()); + } + } + } + public static String randomValidDatafeedId() { CodepointSetGenerator generator = new CodepointSetGenerator("abcdefghijklmnopqrstuvwxyz".toCharArray()); return generator.ofCodePointsLength(random(), 10, 10); @@ -590,14 +692,18 @@ private static DatafeedConfig createDatafeedWithDateHistogram(Long interval) { return createDatafeedWithDateHistogram(dateHistogram); } - private static DatafeedConfig createDatafeedWithDateHistogram(DateHistogramAggregationBuilder dateHistogram) { + private static DatafeedConfig.Builder createDatafeedBuilderWithDateHistogram(DateHistogramAggregationBuilder dateHistogram) { DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed1", "job1"); builder.setIndices(Collections.singletonList("myIndex")); builder.setTypes(Collections.singletonList("myType")); AggregatorFactories.Builder aggs = new AggregatorFactories.Builder().addAggregator(dateHistogram); DatafeedConfig.validateAggregations(aggs); builder.setParsedAggregations(aggs); - return builder.build(); + return builder; + } + + private static DatafeedConfig createDatafeedWithDateHistogram(DateHistogramAggregationBuilder dateHistogram) { + return createDatafeedBuilderWithDateHistogram(dateHistogram).build(); } @Override diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/datafeeds_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/datafeeds_crud.yml index 6e11a1dddc71d..d9e4c5ba6410a 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/datafeeds_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/datafeeds_crud.yml @@ -11,7 +11,8 @@ setup: "job_id":"datafeeds-crud-1", "analysis_config" : { "bucket_span": "1h", - "detectors" :[{"function":"count"}] + "detectors" :[{"function":"count"}], + "summary_count_field_name": "doc_count" }, "data_description" : { "format":"xcontent", @@ -321,6 +322,61 @@ setup: - match: { chunking_config.mode: "manual" } - match: { chunking_config.time_span: "1h" } +--- +"Test put datafeed with aggregations": + - do: + xpack.ml.put_datafeed: + datafeed_id: test-datafeed-aggs-1 + body: > + { + "job_id":"datafeeds-crud-1", + "indices":["index-foo"], + "types":["type-bar"], + "aggs": { + "histogram_buckets":{ + "date_histogram": { + "field": "@timestamp", + "interval": "5m", + "time_zone": "UTC", + "min_doc_count": 0 + }, + "aggs": { + "@timestamp": { + "max": { + "field": "@timestamp" + } + }, + "bytes_in_avg": { + "avg": { + "field": "system.network.in.bytes" + } + }, + "bytes_in_derivative": { + "derivative": { + "buckets_path": "bytes_in_avg" + } + }, + "non_negative_bytes": { + "bucket_script": { + "buckets_path": { + "bytes": "bytes_in_derivative" + }, + "script": "params.bytes > 0 ? params.bytes : null" + } + } + } + } + } + } + - do: + xpack.ml.get_datafeeds: + datafeed_id: test-datafeed-aggs-1 + - match: { datafeeds.0.datafeed_id: "test-datafeed-aggs-1" } + - match: { datafeeds.0.aggregations.histogram_buckets.date_histogram.field: "@timestamp" } + - match: { datafeeds.0.aggregations.histogram_buckets.aggregations.@timestamp.max.field: "@timestamp" } + - match: { datafeeds.0.aggregations.histogram_buckets.aggregations.bytes_in_avg.avg.field: "system.network.in.bytes" } + - match: { datafeeds.0.aggregations.histogram_buckets.aggregations.non_negative_bytes.bucket_script.buckets_path.bytes: "bytes_in_derivative" } + --- "Test delete datafeed": - do: