Skip to content

Commit

Permalink
Add early termination support to BucketCollector (#33279)
Browse files Browse the repository at this point in the history
This commit adds the support to early terminate the collection of a leaf
in the aggregation framework. This change introduces a MultiBucketCollector which
handles CollectionTerminatedException exactly like the Lucene MultiCollector.
Any aggregator can now throw a CollectionTerminatedException without stopping
the collection of a sibling aggregator. This is useful for aggregators that
can infer their result without visiting all documents (e.g.: a min/max aggregation on a match_all query).
  • Loading branch information
jimczi committed Sep 3, 2018
1 parent 0434cb5 commit 588041a
Show file tree
Hide file tree
Showing 11 changed files with 483 additions and 68 deletions.
2 changes: 1 addition & 1 deletion docs/reference/search/profile.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ And the response:
]
},
{
"name": "BucketCollector: [[my_scoped_agg, my_global_agg]]",
"name": "MultiBucketCollector: [[my_scoped_agg, my_global_agg]]",
"reason": "aggregation",
"time_in_nanos": 8273
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void preProcess(SearchContext context) {
}
context.aggregations().aggregators(aggregators);
if (!collectors.isEmpty()) {
Collector collector = BucketCollector.wrap(collectors);
Collector collector = MultiBucketCollector.wrap(collectors);
((BucketCollector)collector).preCollection();
if (context.getProfilers() != null) {
collector = new InternalProfileCollector(collector, CollectorResult.REASON_AGGREGATION,
Expand Down Expand Up @@ -97,7 +97,7 @@ public void execute(SearchContext context) {

// optimize the global collector based execution
if (!globals.isEmpty()) {
BucketCollector globalsCollector = BucketCollector.wrap(globals);
BucketCollector globalsCollector = MultiBucketCollector.wrap(globals);
Query query = context.buildFilteredQuery(Queries.newMatchAllQuery());

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ protected void doPreCollection() throws IOException {
@Override
public final void preCollection() throws IOException {
List<BucketCollector> collectors = Arrays.asList(subAggregators);
collectableSubAggregators = BucketCollector.wrap(collectors);
collectableSubAggregators = MultiBucketCollector.wrap(collectors);
doPreCollection();
collectableSubAggregators.preCollection();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@
import org.apache.lucene.search.Collector;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.StreamSupport;

/**
* A Collector that can collect data in separate buckets.
Expand All @@ -54,61 +50,6 @@ public boolean needsScores() {
}
};

/**
* Wrap the given collectors into a single instance.
*/
public static BucketCollector wrap(Iterable<? extends BucketCollector> collectorList) {
final BucketCollector[] collectors =
StreamSupport.stream(collectorList.spliterator(), false).toArray(size -> new BucketCollector[size]);
switch (collectors.length) {
case 0:
return NO_OP_COLLECTOR;
case 1:
return collectors[0];
default:
return new BucketCollector() {

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
List<LeafBucketCollector> leafCollectors = new ArrayList<>(collectors.length);
for (BucketCollector c : collectors) {
leafCollectors.add(c.getLeafCollector(ctx));
}
return LeafBucketCollector.wrap(leafCollectors);
}

@Override
public void preCollection() throws IOException {
for (BucketCollector collector : collectors) {
collector.preCollection();
}
}

@Override
public void postCollection() throws IOException {
for (BucketCollector collector : collectors) {
collector.postCollection();
}
}

@Override
public boolean needsScores() {
for (BucketCollector collector : collectors) {
if (collector.needsScores()) {
return true;
}
}
return false;
}

@Override
public String toString() {
return Arrays.toString(collectors);
}
};
}
}

@Override
public abstract LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.aggregations;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.MultiCollector;
import org.apache.lucene.search.ScoreCachingWrappingScorer;
import org.apache.lucene.search.Scorer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
* A {@link BucketCollector} which allows running a bucket collection with several
* {@link BucketCollector}s. It is similar to the {@link MultiCollector} except that the
* {@link #wrap} method filters out the {@link BucketCollector#NO_OP_COLLECTOR}s and not
* the null ones.
*/
public class MultiBucketCollector extends BucketCollector {

/** See {@link #wrap(Iterable)}. */
public static BucketCollector wrap(BucketCollector... collectors) {
return wrap(Arrays.asList(collectors));
}

/**
* Wraps a list of {@link BucketCollector}s with a {@link MultiBucketCollector}. This
* method works as follows:
* <ul>
* <li>Filters out the {@link BucketCollector#NO_OP_COLLECTOR}s collectors, so they are not used
* during search time.
* <li>If the input contains 1 real collector, it is returned.
* <li>Otherwise the method returns a {@link MultiBucketCollector} which wraps the
* non-{@link BucketCollector#NO_OP_COLLECTOR} collectors.
* </ul>
*/
public static BucketCollector wrap(Iterable<? extends BucketCollector> collectors) {
// For the user's convenience, we allow NO_OP collectors to be passed.
// However, to improve performance, these null collectors are found
// and dropped from the array we save for actual collection time.
int n = 0;
for (BucketCollector c : collectors) {
if (c != NO_OP_COLLECTOR) {
n++;
}
}

if (n == 0) {
return NO_OP_COLLECTOR;
} else if (n == 1) {
// only 1 Collector - return it.
BucketCollector col = null;
for (BucketCollector c : collectors) {
if (c != null) {
col = c;
break;
}
}
return col;
} else {
BucketCollector[] colls = new BucketCollector[n];
n = 0;
for (BucketCollector c : collectors) {
if (c != null) {
colls[n++] = c;
}
}
return new MultiBucketCollector(colls);
}
}

private final boolean cacheScores;
private final BucketCollector[] collectors;

private MultiBucketCollector(BucketCollector... collectors) {
this.collectors = collectors;
int numNeedsScores = 0;
for (BucketCollector collector : collectors) {
if (collector.needsScores()) {
numNeedsScores += 1;
}
}
this.cacheScores = numNeedsScores >= 2;
}

@Override
public void preCollection() throws IOException {
for (BucketCollector collector : collectors) {
collector.preCollection();
}
}

@Override
public void postCollection() throws IOException {
for (BucketCollector collector : collectors) {
collector.postCollection();
}
}

@Override
public boolean needsScores() {
for (BucketCollector collector : collectors) {
if (collector.needsScores()) {
return true;
}
}
return false;
}

@Override
public String toString() {
return Arrays.toString(collectors);
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext context) throws IOException {
final List<LeafBucketCollector> leafCollectors = new ArrayList<>();
for (BucketCollector collector : collectors) {
final LeafBucketCollector leafCollector;
try {
leafCollector = collector.getLeafCollector(context);
} catch (CollectionTerminatedException e) {
// this leaf collector does not need this segment
continue;
}
leafCollectors.add(leafCollector);
}
switch (leafCollectors.size()) {
case 0:
throw new CollectionTerminatedException();
case 1:
return leafCollectors.get(0);
default:
return new MultiLeafBucketCollector(leafCollectors, cacheScores);
}
}

private static class MultiLeafBucketCollector extends LeafBucketCollector {

private final boolean cacheScores;
private final LeafBucketCollector[] collectors;
private int numCollectors;

private MultiLeafBucketCollector(List<LeafBucketCollector> collectors, boolean cacheScores) {
this.collectors = collectors.toArray(new LeafBucketCollector[collectors.size()]);
this.cacheScores = cacheScores;
this.numCollectors = this.collectors.length;
}

@Override
public void setScorer(Scorer scorer) throws IOException {
if (cacheScores) {
scorer = new ScoreCachingWrappingScorer(scorer);
}
for (int i = 0; i < numCollectors; ++i) {
final LeafCollector c = collectors[i];
c.setScorer(scorer);
}
}

private void removeCollector(int i) {
System.arraycopy(collectors, i + 1, collectors, i, numCollectors - i - 1);
--numCollectors;
collectors[numCollectors] = null;
}

@Override
public void collect(int doc, long bucket) throws IOException {
final LeafBucketCollector[] collectors = this.collectors;
int numCollectors = this.numCollectors;
for (int i = 0; i < numCollectors; ) {
final LeafBucketCollector collector = collectors[i];
try {
collector.collect(doc, bucket);
++i;
} catch (CollectionTerminatedException e) {
removeCollector(i);
numCollectors = this.numCollectors;
if (numCollectors == 0) {
throw new CollectionTerminatedException();
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.search.aggregations.BucketCollector;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.MultiBucketCollector;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;
Expand Down Expand Up @@ -90,7 +91,7 @@ public boolean needsScores() {
/** Set the deferred collectors. */
@Override
public void setDeferredCollector(Iterable<BucketCollector> deferredCollectors) {
this.collector = BucketCollector.wrap(deferredCollectors);
this.collector = MultiBucketCollector.wrap(deferredCollectors);
}

private void finishLeaf() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.BucketCollector;
import org.elasticsearch.search.aggregations.MultiBucketCollector;
import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.internal.SearchContext;
Expand Down Expand Up @@ -59,7 +60,7 @@ protected void doPreCollection() throws IOException {
recordingWrapper.setDeferredCollector(deferredCollectors);
collectors.add(recordingWrapper);
}
collectableSubAggregators = BucketCollector.wrap(collectors);
collectableSubAggregators = MultiBucketCollector.wrap(collectors);
}

public static boolean descendsFromGlobalAggregator(Aggregator parent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.search.aggregations.BucketCollector;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.MultiBucketCollector;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;
Expand Down Expand Up @@ -61,7 +62,7 @@ public MergingBucketsDeferringCollector(SearchContext context) {

@Override
public void setDeferredCollector(Iterable<BucketCollector> deferredCollectors) {
this.collector = BucketCollector.wrap(deferredCollectors);
this.collector = MultiBucketCollector.wrap(deferredCollectors);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.MultiBucketCollector;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource;
Expand Down Expand Up @@ -93,7 +94,7 @@ protected void doClose() {
@Override
protected void doPreCollection() throws IOException {
List<BucketCollector> collectors = Arrays.asList(subAggregators);
deferredCollectors = BucketCollector.wrap(collectors);
deferredCollectors = MultiBucketCollector.wrap(collectors);
collectableSubAggregators = BucketCollector.NO_OP_COLLECTOR;
}

Expand Down
Loading

0 comments on commit 588041a

Please sign in to comment.