Skip to content

Commit

Permalink
Clean up BoxplotAggregator to reuse AbstractTDigestPercentilesAggregator
Browse files Browse the repository at this point in the history
Refactors BoxplotAggregator to reuse parts that are already available in
AbstractTDigestPercentilesAggregator

Follow up for elastic#51948
  • Loading branch information
imotov committed Mar 10, 2020
1 parent 55f7246 commit 125e319
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import java.util.List;
import java.util.Map;

abstract class AbstractTDigestPercentilesAggregator extends NumericMetricsAggregator.MultiValue {
public abstract class AbstractTDigestPercentilesAggregator extends NumericMetricsAggregator.MultiValue {

private static int indexOfKey(double[] keys, double key) {
return ArrayUtils.binarySearch(keys, key, 0.001);
Expand All @@ -53,7 +53,7 @@ private static int indexOfKey(double[] keys, double key) {
protected final double compression;
protected final boolean keyed;

AbstractTDigestPercentilesAggregator(String name, ValuesSource valuesSource, SearchContext context, Aggregator parent,
protected AbstractTDigestPercentilesAggregator(String name, ValuesSource valuesSource, SearchContext context, Aggregator parent,
double[] keys, double compression, boolean keyed, DocValueFormat formatter,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
super(name, context, parent, pipelineAggregators, metaData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,10 @@

package org.elasticsearch.xpack.analytics.boxplot;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.index.fielddata.HistogramValue;
import org.elasticsearch.index.fielddata.HistogramValues;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator;
import org.elasticsearch.search.aggregations.metrics.AbstractTDigestPercentilesAggregator;
import org.elasticsearch.search.aggregations.metrics.TDigestState;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource;
Expand All @@ -29,79 +19,13 @@
import java.util.List;
import java.util.Map;

public class BoxplotAggregator extends NumericMetricsAggregator.MultiValue {

private final ValuesSource valuesSource;
private final DocValueFormat format;
protected ObjectArray<TDigestState> states;
protected final double compression;
public class BoxplotAggregator extends AbstractTDigestPercentilesAggregator {

BoxplotAggregator(String name, ValuesSource valuesSource, DocValueFormat formatter, double compression,
SearchContext context, Aggregator parent, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
super(name, context, parent, pipelineAggregators, metaData);
this.valuesSource = valuesSource;
this.format = formatter;
this.compression = compression;
if (valuesSource != null) {
states = context.bigArrays().newObjectArray(1);
}
}

@Override
public ScoreMode scoreMode() {
return valuesSource != null && valuesSource.needsScores() ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES;
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
if (valuesSource == null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
}
final BigArrays bigArrays = context.bigArrays();
if (valuesSource instanceof ValuesSource.Histogram) {
final HistogramValues values = ((ValuesSource.Histogram)valuesSource).getHistogramValues(ctx);
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
TDigestState state = getExistingOrNewHistogram(bigArrays, bucket);
if (values.advanceExact(doc)) {
final HistogramValue sketch = values.histogram();
while(sketch.next()) {
state.add(sketch.value(), sketch.count());
}
}
}
};
} else {
final SortedNumericDoubleValues values = ((ValuesSource.Numeric)valuesSource).doubleValues(ctx);
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
states = bigArrays.grow(states, bucket + 1);
if (values.advanceExact(doc)) {
TDigestState state = getExistingOrNewHistogram(bigArrays, bucket);
if (values.advanceExact(doc)) {
final int valueCount = values.docValueCount();
for (int i = 0; i < valueCount; i++) {
state.add(values.nextValue());
}
}
}
}
};
}
}

private TDigestState getExistingOrNewHistogram(final BigArrays bigArrays, long bucket) {
states = bigArrays.grow(states, bucket + 1);
TDigestState state = states.get(bucket);
if (state == null) {
state = new TDigestState(compression);
states.set(bucket, state);
}
return state;
super(name, valuesSource, context, parent, null, compression, false, formatter, pipelineAggregators,
metaData);
}

@Override
Expand All @@ -123,31 +47,18 @@ public double metric(String name, long owningBucketOrd) {
return InternalBoxplot.Metrics.resolve(name).value(state);
}


@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) {
TDigestState state = getState(owningBucketOrdinal);
if (state == null) {
return buildEmptyAggregation();
} else {
return new InternalBoxplot(name, state, format, pipelineAggregators(), metaData());
return new InternalBoxplot(name, state, formatter, pipelineAggregators(), metaData());
}
}

TDigestState getState(long bucketOrd) {
if (valuesSource == null || bucketOrd >= states.size()) {
return null;
}
return states.get(bucketOrd);
}

@Override
public InternalAggregation buildEmptyAggregation() {
return new InternalBoxplot(name, new TDigestState(compression), format, pipelineAggregators(), metaData());
}

@Override
public void doClose() {
Releasables.close(states);
return new InternalBoxplot(name, new TDigestState(compression), formatter, pipelineAggregators(), metaData());
}
}

0 comments on commit 125e319

Please sign in to comment.