Skip to content

Commit

Permalink
Remove explicit SearchResponse references from server bucket aggs (pa…
Browse files Browse the repository at this point in the history
…rt4) (#102038)
  • Loading branch information
iverase authored Nov 12, 2023
1 parent e7f7e18 commit 182e749
Show file tree
Hide file tree
Showing 7 changed files with 1,866 additions and 1,765 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
package org.elasticsearch.search.aggregations.bucket;

import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.sampler.random.InternalRandomSampler;
import org.elasticsearch.search.aggregations.bucket.sampler.random.RandomSamplerAggregationBuilder;
Expand All @@ -24,6 +22,7 @@

import static org.elasticsearch.search.aggregations.AggregationBuilders.avg;
import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.lessThan;

Expand Down Expand Up @@ -87,40 +86,47 @@ public void setupSuiteScopeCluster() throws Exception {
}

public void testRandomSampler() {
double sampleMonotonicValue = 0.0;
double sampleNumericValue = 0.0;
double sampledDocCount = 0.0;
double[] sampleMonotonicValue = new double[1];
double[] sampleNumericValue = new double[1];
double[] sampledDocCount = new double[1];

for (int i = 0; i < NUM_SAMPLE_RUNS; i++) {
SearchRequest sampledRequest = prepareSearch("idx").addAggregation(
new RandomSamplerAggregationBuilder("sampler").setProbability(PROBABILITY)
.subAggregation(avg("mean_monotonic").field(MONOTONIC_VALUE))
.subAggregation(avg("mean_numeric").field(NUMERIC_VALUE))
).request();
InternalRandomSampler sampler = client().search(sampledRequest).actionGet().getAggregations().get("sampler");
sampleMonotonicValue += ((Avg) sampler.getAggregations().get("mean_monotonic")).getValue();
sampleNumericValue += ((Avg) sampler.getAggregations().get("mean_numeric")).getValue();
sampledDocCount += sampler.getDocCount();
assertResponse(
prepareSearch("idx").addAggregation(
new RandomSamplerAggregationBuilder("sampler").setProbability(PROBABILITY)
.subAggregation(avg("mean_monotonic").field(MONOTONIC_VALUE))
.subAggregation(avg("mean_numeric").field(NUMERIC_VALUE))
),
response -> {
InternalRandomSampler sampler = response.getAggregations().get("sampler");
sampleMonotonicValue[0] += ((Avg) sampler.getAggregations().get("mean_monotonic")).getValue();
sampleNumericValue[0] += ((Avg) sampler.getAggregations().get("mean_numeric")).getValue();
sampledDocCount[0] += sampler.getDocCount();
}
);
}
sampledDocCount /= NUM_SAMPLE_RUNS;
sampleMonotonicValue /= NUM_SAMPLE_RUNS;
sampleNumericValue /= NUM_SAMPLE_RUNS;
sampledDocCount[0] /= NUM_SAMPLE_RUNS;
sampleMonotonicValue[0] /= NUM_SAMPLE_RUNS;
sampleNumericValue[0] /= NUM_SAMPLE_RUNS;
double expectedDocCount = PROBABILITY * numDocs;
// We're taking the mean of NUM_SAMPLE_RUNS for which each run has standard deviation
// sqrt(PROBABILITY * numDocs) so the 6 sigma error, for which we expect 1 failure in
// 500M runs, is 6 * sqrt(PROBABILITY * numDocs / NUM_SAMPLE_RUNS).
double maxCountError = 6.0 * Math.sqrt(PROBABILITY * numDocs / NUM_SAMPLE_RUNS);
assertThat(Math.abs(sampledDocCount - expectedDocCount), lessThan(maxCountError));

SearchResponse trueValueResponse = prepareSearch("idx").addAggregation(avg("mean_monotonic").field(MONOTONIC_VALUE))
.addAggregation(avg("mean_numeric").field(NUMERIC_VALUE))
.get();
double trueMonotonic = ((Avg) trueValueResponse.getAggregations().get("mean_monotonic")).getValue();
double trueNumeric = ((Avg) trueValueResponse.getAggregations().get("mean_numeric")).getValue();
double maxMonotonicError = 6.0 * Math.sqrt(varMonotonic / (numDocs * PROBABILITY * NUM_SAMPLE_RUNS));
double maxNumericError = 6.0 * Math.sqrt(varNumeric / (numDocs * PROBABILITY * NUM_SAMPLE_RUNS));
assertThat(Math.abs(sampleMonotonicValue - trueMonotonic), lessThan(maxMonotonicError));
assertThat(Math.abs(sampleNumericValue - trueNumeric), lessThan(maxNumericError));
assertThat(Math.abs(sampledDocCount[0] - expectedDocCount), lessThan(maxCountError));

assertResponse(
prepareSearch("idx").addAggregation(avg("mean_monotonic").field(MONOTONIC_VALUE))
.addAggregation(avg("mean_numeric").field(NUMERIC_VALUE)),
response -> {
double trueMonotonic = ((Avg) response.getAggregations().get("mean_monotonic")).getValue();
double trueNumeric = ((Avg) response.getAggregations().get("mean_numeric")).getValue();
double maxMonotonicError = 6.0 * Math.sqrt(varMonotonic / (numDocs * PROBABILITY * NUM_SAMPLE_RUNS));
double maxNumericError = 6.0 * Math.sqrt(varNumeric / (numDocs * PROBABILITY * NUM_SAMPLE_RUNS));
assertThat(Math.abs(sampleMonotonicValue[0] - trueMonotonic), lessThan(maxMonotonicError));
assertThat(Math.abs(sampleNumericValue[0] - trueNumeric), lessThan(maxNumericError));
}
);
}

public void testRandomSamplerHistogram() {
Expand All @@ -129,54 +135,62 @@ public void testRandomSamplerHistogram() {
Map<String, Double> sampledDocCount = new HashMap<>();

for (int i = 0; i < NUM_SAMPLE_RUNS; i++) {
SearchRequest sampledRequest = prepareSearch("idx").addAggregation(
new RandomSamplerAggregationBuilder("sampler").setProbability(PROBABILITY)
.subAggregation(
histogram("histo").field(NUMERIC_VALUE)
.interval(5.0)
.subAggregation(avg("mean_monotonic").field(MONOTONIC_VALUE))
.subAggregation(avg("mean_numeric").field(NUMERIC_VALUE))
)
).request();
InternalRandomSampler sampler = client().search(sampledRequest).actionGet().getAggregations().get("sampler");
Histogram histo = sampler.getAggregations().get("histo");
for (Histogram.Bucket bucket : histo.getBuckets()) {
sampleMonotonicValue.compute(
bucket.getKeyAsString(),
(k, v) -> ((Avg) bucket.getAggregations().get("mean_monotonic")).getValue() + (v == null ? 0 : v)
);
sampleNumericValue.compute(
bucket.getKeyAsString(),
(k, v) -> ((Avg) bucket.getAggregations().get("mean_numeric")).getValue() + (v == null ? 0 : v)
);
sampledDocCount.compute(bucket.getKeyAsString(), (k, v) -> bucket.getDocCount() + (v == null ? 0 : v));
}
assertResponse(
prepareSearch("idx").addAggregation(
new RandomSamplerAggregationBuilder("sampler").setProbability(PROBABILITY)
.subAggregation(
histogram("histo").field(NUMERIC_VALUE)
.interval(5.0)
.subAggregation(avg("mean_monotonic").field(MONOTONIC_VALUE))
.subAggregation(avg("mean_numeric").field(NUMERIC_VALUE))
)
),
response -> {
InternalRandomSampler sampler = response.getAggregations().get("sampler");
Histogram histo = sampler.getAggregations().get("histo");
for (Histogram.Bucket bucket : histo.getBuckets()) {
sampleMonotonicValue.compute(
bucket.getKeyAsString(),
(k, v) -> ((Avg) bucket.getAggregations().get("mean_monotonic")).getValue() + (v == null ? 0 : v)
);
sampleNumericValue.compute(
bucket.getKeyAsString(),
(k, v) -> ((Avg) bucket.getAggregations().get("mean_numeric")).getValue() + (v == null ? 0 : v)
);
sampledDocCount.compute(bucket.getKeyAsString(), (k, v) -> bucket.getDocCount() + (v == null ? 0 : v));
}
}
);
}
for (String key : sampledDocCount.keySet()) {
sampledDocCount.put(key, sampledDocCount.get(key) / NUM_SAMPLE_RUNS);
sampleNumericValue.put(key, sampleNumericValue.get(key) / NUM_SAMPLE_RUNS);
sampleMonotonicValue.put(key, sampleMonotonicValue.get(key) / NUM_SAMPLE_RUNS);
}

SearchResponse trueValueResponse = prepareSearch("idx").addAggregation(
histogram("histo").field(NUMERIC_VALUE)
.interval(5.0)
.subAggregation(avg("mean_monotonic").field(MONOTONIC_VALUE))
.subAggregation(avg("mean_numeric").field(NUMERIC_VALUE))
).get();
Histogram histogram = trueValueResponse.getAggregations().get("histo");
for (Histogram.Bucket bucket : histogram.getBuckets()) {
long numDocs = bucket.getDocCount();
// Note the true count is estimated by dividing the bucket sample doc count by PROBABILITY.
double maxCountError = 6.0 * Math.sqrt(numDocs / NUM_SAMPLE_RUNS / (0.5 * PROBABILITY));
assertThat(Math.abs(sampledDocCount.get(bucket.getKeyAsString()) - numDocs), lessThan(maxCountError));
double trueMonotonic = ((Avg) bucket.getAggregations().get("mean_monotonic")).getValue();
double trueNumeric = ((Avg) bucket.getAggregations().get("mean_numeric")).getValue();
double maxMonotonicError = 6.0 * Math.sqrt(varMonotonic / (numDocs * 0.5 * PROBABILITY * NUM_SAMPLE_RUNS));
double maxNumericError = 6.0 * Math.sqrt(varNumeric / (numDocs * 0.5 * PROBABILITY * NUM_SAMPLE_RUNS));
assertThat(Math.abs(sampleMonotonicValue.get(bucket.getKeyAsString()) - trueMonotonic), lessThan(maxMonotonicError));
assertThat(Math.abs(sampleNumericValue.get(bucket.getKeyAsString()) - trueNumeric), lessThan(maxNumericError));
}
assertResponse(
prepareSearch("idx").addAggregation(
histogram("histo").field(NUMERIC_VALUE)
.interval(5.0)
.subAggregation(avg("mean_monotonic").field(MONOTONIC_VALUE))
.subAggregation(avg("mean_numeric").field(NUMERIC_VALUE))
),
response -> {
Histogram histogram = response.getAggregations().get("histo");
for (Histogram.Bucket bucket : histogram.getBuckets()) {
long numDocs = bucket.getDocCount();
// Note the true count is estimated by dividing the bucket sample doc count by PROBABILITY.
double maxCountError = 6.0 * Math.sqrt(numDocs / NUM_SAMPLE_RUNS / (0.5 * PROBABILITY));
assertThat(Math.abs(sampledDocCount.get(bucket.getKeyAsString()) - numDocs), lessThan(maxCountError));
double trueMonotonic = ((Avg) bucket.getAggregations().get("mean_monotonic")).getValue();
double trueNumeric = ((Avg) bucket.getAggregations().get("mean_numeric")).getValue();
double maxMonotonicError = 6.0 * Math.sqrt(varMonotonic / (numDocs * 0.5 * PROBABILITY * NUM_SAMPLE_RUNS));
double maxNumericError = 6.0 * Math.sqrt(varNumeric / (numDocs * 0.5 * PROBABILITY * NUM_SAMPLE_RUNS));
assertThat(Math.abs(sampleMonotonicValue.get(bucket.getKeyAsString()) - trueMonotonic), lessThan(maxMonotonicError));
assertThat(Math.abs(sampleNumericValue.get(bucket.getKeyAsString()) - trueNumeric), lessThan(maxNumericError));
}
}
);
}

}
Loading

0 comments on commit 182e749

Please sign in to comment.