Skip to content

Commit

Permalink
Adding additional tests for agg parsing in datafeedconfig (#36261)
Browse files Browse the repository at this point in the history
* Adding additional tests for agg parsing in datafeedconfig

* Fixing bug, adding yml test
  • Loading branch information
benwtrent committed Dec 6, 2018
1 parent 5f3893f commit a553168
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public class XContentObjectTransformer<T extends ToXContentObject> {
public static XContentObjectTransformer<AggregatorFactories.Builder> aggregatorTransformer() {
return new XContentObjectTransformer<>(searchRegistry, (p) -> {
// Serializing a map creates an object, need to skip the start object for the aggregation parser
assert(XContentParser.Token.START_OBJECT.equals(p.nextToken()));
XContentParser.Token token = p.nextToken();
assert(XContentParser.Token.START_OBJECT.equals(token));
return AggregatorFactories.parseAggregators(p);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,38 @@
import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParseException;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.avg.AvgAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipelineAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder.ScriptField;
import org.elasticsearch.test.AbstractSerializingTestCase;
Expand Down Expand Up @@ -83,7 +93,7 @@ public static DatafeedConfig createRandomizedDatafeedConfig(String jobId, long b
if (randomBoolean() && addScriptFields == false) {
// can only test with a single agg as the xcontent order gets randomized by test base class and then
// the actual xcontent isn't the same and test fail.
// Testing with a single agg is ok as we don't have special list writeable / xconent logic
// Testing with a single agg is ok as we don't have special list writeable / xcontent logic
AggregatorFactories.Builder aggs = new AggregatorFactories.Builder();
aggHistogramInterval = randomNonNegativeLong();
aggHistogramInterval = aggHistogramInterval> bucketSpanMillis ? bucketSpanMillis : aggHistogramInterval;
Expand Down Expand Up @@ -567,6 +577,98 @@ public void testDefaultFrequency_GivenAggregationsWithHistogramInterval_1_Hour()
assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueHours(12)));
}

public void testSerializationOfComplexAggs() throws IOException {
MaxAggregationBuilder maxTime = AggregationBuilders.max("timestamp").field("timestamp");
AvgAggregationBuilder avgAggregationBuilder = AggregationBuilders.avg("bytes_in_avg").field("system.network.in.bytes");
DerivativePipelineAggregationBuilder derivativePipelineAggregationBuilder =
PipelineAggregatorBuilders.derivative("bytes_in_derivative", "bytes_in_avg");
BucketScriptPipelineAggregationBuilder bucketScriptPipelineAggregationBuilder =
PipelineAggregatorBuilders.bucketScript("non_negative_bytes",
Collections.singletonMap("bytes", "bytes_in_derivative"),
new Script("params.bytes > 0 ? params.bytes : null"));
DateHistogramAggregationBuilder dateHistogram =
AggregationBuilders.dateHistogram("histogram_buckets")
.field("timestamp").interval(300000).timeZone(DateTimeZone.UTC)
.subAggregation(maxTime)
.subAggregation(avgAggregationBuilder)
.subAggregation(derivativePipelineAggregationBuilder)
.subAggregation(bucketScriptPipelineAggregationBuilder);
DatafeedConfig.Builder datafeedConfigBuilder = createDatafeedBuilderWithDateHistogram(dateHistogram);
QueryBuilder terms =
new BoolQueryBuilder().filter(new TermQueryBuilder(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10)));
datafeedConfigBuilder.setParsedQuery(terms);
DatafeedConfig datafeedConfig = datafeedConfigBuilder.build();
AggregatorFactories.Builder aggBuilder = new AggregatorFactories.Builder().addAggregator(dateHistogram);


XContentType xContentType = XContentType.JSON;
BytesReference bytes = XContentHelper.toXContent(datafeedConfig, xContentType, false);
XContentParser parser = XContentHelper.createParser(xContentRegistry(),
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
bytes,
xContentType);

DatafeedConfig parsedDatafeedConfig = doParseInstance(parser);
assertEquals(datafeedConfig, parsedDatafeedConfig);

// Assert that the parsed versions of our aggs and queries work as well
assertEquals(aggBuilder, parsedDatafeedConfig.getParsedAggregations());
assertEquals(terms, parsedDatafeedConfig.getParsedQuery());

try(BytesStreamOutput output = new BytesStreamOutput()) {
datafeedConfig.writeTo(output);
try(StreamInput streamInput = output.bytes().streamInput()) {
DatafeedConfig streamedDatafeedConfig = new DatafeedConfig(streamInput);
assertEquals(datafeedConfig, streamedDatafeedConfig);

// Assert that the parsed versions of our aggs and queries work as well
assertEquals(aggBuilder, streamedDatafeedConfig.getParsedAggregations());
assertEquals(terms, streamedDatafeedConfig.getParsedQuery());
}
}
}

public void testSerializationOfComplexAggsBetweenVersions() throws IOException {
MaxAggregationBuilder maxTime = AggregationBuilders.max("timestamp").field("timestamp");
AvgAggregationBuilder avgAggregationBuilder = AggregationBuilders.avg("bytes_in_avg").field("system.network.in.bytes");
DerivativePipelineAggregationBuilder derivativePipelineAggregationBuilder =
PipelineAggregatorBuilders.derivative("bytes_in_derivative", "bytes_in_avg");
BucketScriptPipelineAggregationBuilder bucketScriptPipelineAggregationBuilder =
PipelineAggregatorBuilders.bucketScript("non_negative_bytes",
Collections.singletonMap("bytes", "bytes_in_derivative"),
new Script("params.bytes > 0 ? params.bytes : null"));
DateHistogramAggregationBuilder dateHistogram =
AggregationBuilders.dateHistogram("histogram_buckets")
.field("timestamp").interval(300000).timeZone(DateTimeZone.UTC)
.subAggregation(maxTime)
.subAggregation(avgAggregationBuilder)
.subAggregation(derivativePipelineAggregationBuilder)
.subAggregation(bucketScriptPipelineAggregationBuilder);
DatafeedConfig.Builder datafeedConfigBuilder = createDatafeedBuilderWithDateHistogram(dateHistogram);
QueryBuilder terms =
new BoolQueryBuilder().filter(new TermQueryBuilder(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10)));
datafeedConfigBuilder.setParsedQuery(terms);
DatafeedConfig datafeedConfig = datafeedConfigBuilder.build();

SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList());
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(searchModule.getNamedWriteables());

try (BytesStreamOutput output = new BytesStreamOutput()) {
output.setVersion(Version.V_6_0_0);
datafeedConfig.writeTo(output);
try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), namedWriteableRegistry)) {
in.setVersion(Version.V_6_0_0);
DatafeedConfig streamedDatafeedConfig = new DatafeedConfig(in);
assertEquals(datafeedConfig, streamedDatafeedConfig);

// Assert that the parsed versions of our aggs and queries work as well
assertEquals(new AggregatorFactories.Builder().addAggregator(dateHistogram),
streamedDatafeedConfig.getParsedAggregations());
assertEquals(terms, streamedDatafeedConfig.getParsedQuery());
}
}
}

public static String randomValidDatafeedId() {
CodepointSetGenerator generator = new CodepointSetGenerator("abcdefghijklmnopqrstuvwxyz".toCharArray());
return generator.ofCodePointsLength(random(), 10, 10);
Expand All @@ -590,14 +692,18 @@ private static DatafeedConfig createDatafeedWithDateHistogram(Long interval) {
return createDatafeedWithDateHistogram(dateHistogram);
}

private static DatafeedConfig createDatafeedWithDateHistogram(DateHistogramAggregationBuilder dateHistogram) {
private static DatafeedConfig.Builder createDatafeedBuilderWithDateHistogram(DateHistogramAggregationBuilder dateHistogram) {
DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed1", "job1");
builder.setIndices(Collections.singletonList("myIndex"));
builder.setTypes(Collections.singletonList("myType"));
AggregatorFactories.Builder aggs = new AggregatorFactories.Builder().addAggregator(dateHistogram);
DatafeedConfig.validateAggregations(aggs);
builder.setParsedAggregations(aggs);
return builder.build();
return builder;
}

private static DatafeedConfig createDatafeedWithDateHistogram(DateHistogramAggregationBuilder dateHistogram) {
return createDatafeedBuilderWithDateHistogram(dateHistogram).build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ setup:
"job_id":"datafeeds-crud-1",
"analysis_config" : {
"bucket_span": "1h",
"detectors" :[{"function":"count"}]
"detectors" :[{"function":"count"}],
"summary_count_field_name": "doc_count"
},
"data_description" : {
"format":"xcontent",
Expand Down Expand Up @@ -321,6 +322,61 @@ setup:
- match: { chunking_config.mode: "manual" }
- match: { chunking_config.time_span: "1h" }

---
"Test put datafeed with aggregations":
- do:
xpack.ml.put_datafeed:
datafeed_id: test-datafeed-aggs-1
body: >
{
"job_id":"datafeeds-crud-1",
"indices":["index-foo"],
"types":["type-bar"],
"aggs": {
"histogram_buckets":{
"date_histogram": {
"field": "@timestamp",
"interval": "5m",
"time_zone": "UTC",
"min_doc_count": 0
},
"aggs": {
"@timestamp": {
"max": {
"field": "@timestamp"
}
},
"bytes_in_avg": {
"avg": {
"field": "system.network.in.bytes"
}
},
"bytes_in_derivative": {
"derivative": {
"buckets_path": "bytes_in_avg"
}
},
"non_negative_bytes": {
"bucket_script": {
"buckets_path": {
"bytes": "bytes_in_derivative"
},
"script": "params.bytes > 0 ? params.bytes : null"
}
}
}
}
}
}
- do:
xpack.ml.get_datafeeds:
datafeed_id: test-datafeed-aggs-1
- match: { datafeeds.0.datafeed_id: "test-datafeed-aggs-1" }
- match: { datafeeds.0.aggregations.histogram_buckets.date_histogram.field: "@timestamp" }
- match: { datafeeds.0.aggregations.histogram_buckets.aggregations.@timestamp.max.field: "@timestamp" }
- match: { datafeeds.0.aggregations.histogram_buckets.aggregations.bytes_in_avg.avg.field: "system.network.in.bytes" }
- match: { datafeeds.0.aggregations.histogram_buckets.aggregations.non_negative_bytes.bucket_script.buckets_path.bytes: "bytes_in_derivative" }

---
"Test delete datafeed":
- do:
Expand Down

0 comments on commit a553168

Please sign in to comment.