Skip to content

Commit

Permalink
Use #updateTop to speed up InternalComposite#reduce (#71278)
Browse files Browse the repository at this point in the history
Co-authored-by: guofeng.my <guofeng.my@bytedance.com>
  • Loading branch information
gf2121 and gf2121 authored Apr 8, 2021
1 parent 5bcd1f3 commit b43f2ab
Showing 1 changed file with 11 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.search.aggregations.bucket.composite;

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -29,7 +30,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Set;

public class InternalComposite
Expand Down Expand Up @@ -149,7 +149,12 @@ int[] getReverseMuls() {

@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
PriorityQueue<BucketIterator> pq = new PriorityQueue<>(aggregations.size());
PriorityQueue<BucketIterator> pq = new PriorityQueue<>(aggregations.size()) {
@Override
protected boolean lessThan(BucketIterator a, BucketIterator b) {
return a.compareTo(b) < 0;
}
};
boolean earlyTerminated = false;
for (InternalAggregation agg : aggregations) {
InternalComposite sortedAgg = (InternalComposite) agg;
Expand All @@ -163,7 +168,7 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
List<InternalBucket> buckets = new ArrayList<>();
List<InternalBucket> result = new ArrayList<>();
while (pq.size() > 0) {
BucketIterator bucketIt = pq.poll();
BucketIterator bucketIt = pq.top();
if (lastBucket != null && bucketIt.current.compareKey(lastBucket) != 0) {
InternalBucket reduceBucket = reduceBucket(buckets, reduceContext);
buckets.clear();
Expand All @@ -175,7 +180,9 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
lastBucket = bucketIt.current;
buckets.add(bucketIt.current);
if (bucketIt.next() != null) {
pq.add(bucketIt);
pq.updateTop();
} else {
pq.pop();
}
}
if (buckets.size() > 0) {
Expand Down

0 comments on commit b43f2ab

Please sign in to comment.