From b43f2ab0d3b23df5a52d6936fa4f7e8ce9888f3a Mon Sep 17 00:00:00 2001 From: gf2121 <52390227+gf2121@users.noreply.github.com> Date: Fri, 9 Apr 2021 01:10:43 +0800 Subject: [PATCH] Use #updateTop to speed up InternalComposite#reduce (#71278) Co-authored-by: guofeng.my --- .../bucket/composite/InternalComposite.java | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java index 1e336d50c8f6a..639eb6605b020 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java @@ -9,6 +9,7 @@ package org.elasticsearch.search.aggregations.bucket.composite; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.PriorityQueue; import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -29,7 +30,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.PriorityQueue; import java.util.Set; public class InternalComposite @@ -149,7 +149,12 @@ int[] getReverseMuls() { @Override public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { - PriorityQueue pq = new PriorityQueue<>(aggregations.size()); + PriorityQueue pq = new PriorityQueue<>(aggregations.size()) { + @Override + protected boolean lessThan(BucketIterator a, BucketIterator b) { + return a.compareTo(b) < 0; + } + }; boolean earlyTerminated = false; for (InternalAggregation agg : aggregations) { InternalComposite sortedAgg = (InternalComposite) agg; @@ -163,7 +168,7 @@ public InternalAggregation reduce(List aggregations, Reduce List buckets = new ArrayList<>(); List result = new ArrayList<>(); while (pq.size() > 0) { - BucketIterator bucketIt = pq.poll(); + BucketIterator bucketIt = pq.top(); if (lastBucket != null && bucketIt.current.compareKey(lastBucket) != 0) { InternalBucket reduceBucket = reduceBucket(buckets, reduceContext); buckets.clear(); @@ -175,7 +180,9 @@ public InternalAggregation reduce(List aggregations, Reduce lastBucket = bucketIt.current; buckets.add(bucketIt.current); if (bucketIt.next() != null) { - pq.add(bucketIt); + pq.updateTop(); + } else { + pq.pop(); } } if (buckets.size() > 0) {