From f933f80902fbaea31ce40667785d2e3a117f1af1 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 21 Feb 2017 13:02:48 +0100 Subject: [PATCH] First step towards incremental reduction of query responses (#23253) Today all query results are buffered up until we received responses of all shards. This can hold on to a significant amount of memory if the number of shards is large. This commit adds a first step towards incrementally reducing aggregations results if a, per search request, configurable amount of responses are received. If enough query results have been received and buffered all so-far received aggregation responses will be reduced and released to be GCed. --- .../search/AbstractSearchAsyncAction.java | 34 ++--- .../action/search/CountedCollector.java | 21 +-- .../action/search/DfsQueryPhase.java | 12 +- .../action/search/FetchSearchPhase.java | 20 +-- .../action/search/InitialSearchPhase.java | 51 +++++++ .../SearchDfsQueryThenFetchAsyncAction.java | 7 +- .../action/search/SearchPhaseContext.java | 1 + .../action/search/SearchPhaseController.java | 144 ++++++++++++++++-- .../SearchQueryThenFetchAsyncAction.java | 7 +- .../action/search/SearchRequest.java | 28 ++++ .../action/search/SearchRequestBuilder.java | 5 + .../aggregations/InternalAggregation.java | 19 ++- .../bucket/geogrid/InternalGeoHashGrid.java | 2 +- .../histogram/InternalDateHistogram.java | 6 +- .../bucket/histogram/InternalHistogram.java | 6 +- .../significant/InternalSignificantTerms.java | 5 +- .../bucket/terms/InternalTerms.java | 7 +- .../scripted/InternalScriptedMetric.java | 33 ++-- .../metrics/tophits/InternalTopHits.java | 15 +- .../action/search/CountedCollectorTests.java | 2 +- .../action/search/DfsQueryPhaseTests.java | 6 +- .../action/search/FetchSearchPhaseTests.java | 62 ++++---- .../action/search/SearchAsyncActionTests.java | 12 +- .../search/SearchPhaseControllerTests.java | 104 +++++++++++++ .../aggregations/AggregatorTestCase.java | 12 +- .../InternalAggregationTestCase.java | 11 +- .../aggregations/bucket/MinDocCountIT.java | 24 +-- .../bucket/TermsDocCountErrorIT.java | 36 +++++ .../SignificanceHeuristicTests.java | 7 +- .../bucket/terms/TermsAggregatorTests.java | 2 +- .../aggregations/metrics/TopHitsIT.java | 58 +++---- .../metrics/avg/InternalAvgTests.java | 5 +- .../test/client/RandomizingClient.java | 6 +- 33 files changed, 588 insertions(+), 182 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 962c52fbdc2ed..bf95b7517c6ed 100644 --- a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -42,7 +42,6 @@ import java.util.List; import java.util.Map; -import java.util.StringJoiner; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; @@ -61,7 +60,7 @@ abstract class AbstractSearchAsyncAction exten **/ private final Function nodeIdToConnection; private final SearchTask task; - private final AtomicArray results; + private final SearchPhaseResults results; private final long clusterStateVersion; private final Map aliasFilter; private final Map concreteIndexBoosts; @@ -76,7 +75,7 @@ protected AbstractSearchAsyncAction(String name, Logger logger, SearchTransportS Map aliasFilter, Map concreteIndexBoosts, Executor executor, SearchRequest request, ActionListener listener, GroupShardsIterator shardsIts, long startTime, - long clusterStateVersion, SearchTask task) { + long clusterStateVersion, SearchTask task, SearchPhaseResults resultConsumer) { super(name, request, shardsIts, logger); this.startTime = startTime; this.logger = logger; @@ -87,9 +86,9 @@ protected AbstractSearchAsyncAction(String name, Logger logger, SearchTransportS this.listener = listener; this.nodeIdToConnection = nodeIdToConnection; this.clusterStateVersion = clusterStateVersion; - results = new AtomicArray<>(shardsIts.size()); this.concreteIndexBoosts = concreteIndexBoosts; this.aliasFilter = aliasFilter; + this.results = resultConsumer; } /** @@ -105,7 +104,7 @@ private long buildTookInMillis() { * This is the main entry point for a search. This method starts the search execution of the initial phase. */ public final void start() { - if (results.length() == 0) { + if (getNumShards() == 0) { //no search shards to search on, bail with empty response //(it happens with search across _all with no indices around and consistent with broadcast operations) listener.onResponse(new SearchResponse(InternalSearchResponse.empty(), null, 0, 0, buildTookInMillis(), @@ -130,8 +129,8 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha onPhaseFailure(currentPhase, "all shards failed", null); } else { if (logger.isTraceEnabled()) { - final String resultsFrom = results.asList().stream() - .map(r -> r.value.shardTarget().toString()).collect(Collectors.joining(",")); + final String resultsFrom = results.getSuccessfulResults() + .map(r -> r.shardTarget().toString()).collect(Collectors.joining(",")); logger.trace("[{}] Moving to next phase: [{}], based on results from: {} (cluster state version: {})", currentPhase.getName(), nextPhase.getName(), resultsFrom, clusterStateVersion); } @@ -178,7 +177,7 @@ public final void onShardFailure(final int shardIndex, @Nullable SearchShardTarg synchronized (shardFailuresMutex) { shardFailures = this.shardFailures.get(); // read again otherwise somebody else has created it? if (shardFailures == null) { // still null so we are the first and create a new instance - shardFailures = new AtomicArray<>(results.length()); + shardFailures = new AtomicArray<>(getNumShards()); this.shardFailures.set(shardFailures); } } @@ -194,7 +193,7 @@ public final void onShardFailure(final int shardIndex, @Nullable SearchShardTarg } } - if (results.get(shardIndex) != null) { + if (results.hasResult(shardIndex)) { assert failure == null : "shard failed before but shouldn't: " + failure; successfulOps.decrementAndGet(); // if this shard was successful before (initial phase) we have to adjust the counter } @@ -207,22 +206,22 @@ public final void onShardFailure(final int shardIndex, @Nullable SearchShardTarg * @param exception the exception explaining or causing the phase failure */ private void raisePhaseFailure(SearchPhaseExecutionException exception) { - for (AtomicArray.Entry entry : results.asList()) { + results.getSuccessfulResults().forEach((entry) -> { try { - Transport.Connection connection = nodeIdToConnection.apply(entry.value.shardTarget().getNodeId()); - sendReleaseSearchContext(entry.value.id(), connection); + Transport.Connection connection = nodeIdToConnection.apply(entry.shardTarget().getNodeId()); + sendReleaseSearchContext(entry.id(), connection); } catch (Exception inner) { inner.addSuppressed(exception); logger.trace("failed to release context", inner); } - } + }); listener.onFailure(exception); } @Override public final void onShardSuccess(int shardIndex, Result result) { successfulOps.incrementAndGet(); - results.set(shardIndex, result); + results.consumeResult(shardIndex, result); if (logger.isTraceEnabled()) { logger.trace("got first-phase result from {}", result != null ? result.shardTarget() : null); } @@ -242,7 +241,7 @@ public final void onPhaseDone() { @Override public final int getNumShards() { - return results.length(); + return results.getNumShards(); } @Override @@ -262,7 +261,7 @@ public final SearchRequest getRequest() { @Override public final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) { - return new SearchResponse(internalSearchResponse, scrollId, results.length(), successfulOps.get(), + return new SearchResponse(internalSearchResponse, scrollId, getNumShards(), successfulOps.get(), buildTookInMillis(), buildShardFailures()); } @@ -310,6 +309,5 @@ public final ShardSearchTransportRequest buildShardSearchRequest(ShardIterator s * executed shard request * @param context the search context for the next phase */ - protected abstract SearchPhase getNextPhase(AtomicArray results, SearchPhaseContext context); - + protected abstract SearchPhase getNextPhase(SearchPhaseResults results, SearchPhaseContext context); } diff --git a/core/src/main/java/org/elasticsearch/action/search/CountedCollector.java b/core/src/main/java/org/elasticsearch/action/search/CountedCollector.java index be0ee2c161e24..65f2d2d280ba1 100644 --- a/core/src/main/java/org/elasticsearch/action/search/CountedCollector.java +++ b/core/src/main/java/org/elasticsearch/action/search/CountedCollector.java @@ -19,7 +19,6 @@ package org.elasticsearch.action.search; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; @@ -30,17 +29,13 @@ * where the given index is used to set the result on the array. */ final class CountedCollector { - private final AtomicArray resultArray; + private final ResultConsumer resultConsumer; private final CountDown counter; private final Runnable onFinish; private final SearchPhaseContext context; - CountedCollector(AtomicArray resultArray, int expectedOps, Runnable onFinish, SearchPhaseContext context) { - if (expectedOps > resultArray.length()) { - throw new IllegalStateException("unexpected number of operations. got: " + expectedOps + " but array size is: " - + resultArray.length()); - } - this.resultArray = resultArray; + CountedCollector(ResultConsumer resultConsumer, int expectedOps, Runnable onFinish, SearchPhaseContext context) { + this.resultConsumer = resultConsumer; this.counter = new CountDown(expectedOps); this.onFinish = onFinish; this.context = context; @@ -63,7 +58,7 @@ void countDown() { void onResult(int index, R result, SearchShardTarget target) { try { result.shardTarget(target); - resultArray.set(index, result); + resultConsumer.consume(index, result); } finally { countDown(); } @@ -80,4 +75,12 @@ void onFailure(final int shardIndex, @Nullable SearchShardTarget shardTarget, Ex countDown(); } } + + /** + * A functional interface to plug in shard result consumers to this collector + */ + @FunctionalInterface + public interface ResultConsumer { + void consume(int shardIndex, R result); + } } diff --git a/core/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java b/core/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java index 5447b9eee8f10..0ac3c69b8ebc7 100644 --- a/core/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java +++ b/core/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java @@ -40,18 +40,19 @@ * @see CountedCollector#onFailure(int, SearchShardTarget, Exception) */ final class DfsQueryPhase extends SearchPhase { - private final AtomicArray queryResult; + private final InitialSearchPhase.SearchPhaseResults queryResult; private final SearchPhaseController searchPhaseController; private final AtomicArray dfsSearchResults; - private final Function, SearchPhase> nextPhaseFactory; + private final Function, SearchPhase> nextPhaseFactory; private final SearchPhaseContext context; private final SearchTransportService searchTransportService; DfsQueryPhase(AtomicArray dfsSearchResults, SearchPhaseController searchPhaseController, - Function, SearchPhase> nextPhaseFactory, SearchPhaseContext context) { + Function, SearchPhase> nextPhaseFactory, + SearchPhaseContext context) { super("dfs_query"); - this.queryResult = new AtomicArray<>(dfsSearchResults.length()); + this.queryResult = searchPhaseController.newSearchPhaseResults(context.getRequest(), context.getNumShards()); this.searchPhaseController = searchPhaseController; this.dfsSearchResults = dfsSearchResults; this.nextPhaseFactory = nextPhaseFactory; @@ -64,7 +65,8 @@ public void run() throws IOException { // TODO we can potentially also consume the actual per shard results from the initial phase here in the aggregateDfs // to free up memory early final AggregatedDfs dfs = searchPhaseController.aggregateDfs(dfsSearchResults); - final CountedCollector counter = new CountedCollector<>(queryResult, dfsSearchResults.asList().size(), + final CountedCollector counter = new CountedCollector<>(queryResult::consumeResult, + dfsSearchResults.asList().size(), () -> { context.executeNextPhase(this, nextPhaseFactory.apply(queryResult)); }, context); diff --git a/core/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java b/core/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java index 1215e97ae3ab1..20d91770675f7 100644 --- a/core/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java +++ b/core/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java @@ -49,29 +49,31 @@ final class FetchSearchPhase extends SearchPhase { private final Function nextPhaseFactory; private final SearchPhaseContext context; private final Logger logger; + private final InitialSearchPhase.SearchPhaseResults resultConsumer; - FetchSearchPhase(AtomicArray queryResults, + FetchSearchPhase(InitialSearchPhase.SearchPhaseResults resultConsumer, SearchPhaseController searchPhaseController, SearchPhaseContext context) { - this(queryResults, searchPhaseController, context, + this(resultConsumer, searchPhaseController, context, (response) -> new ExpandSearchPhase(context, response, // collapse only happens if the request has inner hits (finalResponse) -> sendResponsePhase(finalResponse, context))); } - FetchSearchPhase(AtomicArray queryResults, + FetchSearchPhase(InitialSearchPhase.SearchPhaseResults resultConsumer, SearchPhaseController searchPhaseController, SearchPhaseContext context, Function nextPhaseFactory) { super("fetch"); - if (context.getNumShards() != queryResults.length()) { + if (context.getNumShards() != resultConsumer.getNumShards()) { throw new IllegalStateException("number of shards must match the length of the query results but doesn't:" - + context.getNumShards() + "!=" + queryResults.length()); + + context.getNumShards() + "!=" + resultConsumer.getNumShards()); } - this.fetchResults = new AtomicArray<>(queryResults.length()); + this.fetchResults = new AtomicArray<>(resultConsumer.getNumShards()); this.searchPhaseController = searchPhaseController; - this.queryResults = queryResults; + this.queryResults = resultConsumer.results; this.nextPhaseFactory = nextPhaseFactory; this.context = context; this.logger = context.getLogger(); + this.resultConsumer = resultConsumer; } @@ -99,7 +101,7 @@ private void innerRun() throws IOException { ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(isScrollSearch, queryResults); String scrollId = isScrollSearch ? TransportSearchHelper.buildScrollId(queryResults) : null; List> queryResultsAsList = queryResults.asList(); - final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedQueryPhase(queryResultsAsList); + final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = resultConsumer.reduce(); final boolean queryAndFetchOptimization = queryResults.length() == 1; final Runnable finishPhase = () -> moveToNextPhase(searchPhaseController, sortedShardDocs, scrollId, reducedQueryPhase, queryAndFetchOptimization ? @@ -119,7 +121,7 @@ private void innerRun() throws IOException { final ScoreDoc[] lastEmittedDocPerShard = isScrollSearch ? searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, sortedShardDocs, numShards) : null; - final CountedCollector counter = new CountedCollector<>(fetchResults, + final CountedCollector counter = new CountedCollector<>(fetchResults::set, docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or not finishPhase, context); for (int i = 0; i < docIdsToLoad.length; i++) { diff --git a/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java b/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java index dac215801fcea..f21e9d228d69f 100644 --- a/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java +++ b/core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java @@ -28,12 +28,14 @@ import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.transport.ConnectTransportException; import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; /** * This is an abstract base class that encapsulates the logic to fan out to all shards in provided {@link GroupShardsIterator} @@ -213,4 +215,53 @@ private void onShardResult(int shardIndex, String nodeId, FirstResult result, Sh * @param listener the listener to notify on response */ protected abstract void executePhaseOnShard(ShardIterator shardIt, ShardRouting shard, ActionListener listener); + + /** + * This class acts as a basic result collection that can be extended to do on-the-fly reduction or result processing + */ + static class SearchPhaseResults { + final AtomicArray results; + + SearchPhaseResults(int size) { + results = new AtomicArray<>(size); + } + + /** + * Returns the number of expected results this class should collect + */ + final int getNumShards() { + return results.length(); + } + + /** + * A stream of all non-null (successful) shard results + */ + final Stream getSuccessfulResults() { + return results.asList().stream().map(e -> e.value); + } + + /** + * Consumes a single shard result + * @param shardIndex the shards index, this is a 0-based id that is used to establish a 1 to 1 mapping to the searched shards + * @param result the shards result + */ + void consumeResult(int shardIndex, Result result) { + assert results.get(shardIndex) == null : "shardIndex: " + shardIndex + " is already set"; + results.set(shardIndex, result); + } + + /** + * Returns true iff a result if present for the given shard ID. + */ + final boolean hasResult(int shardIndex) { + return results.get(shardIndex) != null; + } + + /** + * Reduces the collected results + */ + SearchPhaseController.ReducedQueryPhase reduce() { + throw new UnsupportedOperationException("reduce is not supported"); + } + } } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java index 2cf0c317d00a9..d846c42dbea5d 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java @@ -24,7 +24,6 @@ import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.dfs.DfsSearchResult; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.transport.Transport; @@ -43,7 +42,7 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction ActionListener listener, GroupShardsIterator shardsIts, long startTime, long clusterStateVersion, SearchTask task) { super("dfs", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, - request, listener, shardsIts, startTime, clusterStateVersion, task); + request, listener, shardsIts, startTime, clusterStateVersion, task, new SearchPhaseResults<>(shardsIts.size())); this.searchPhaseController = searchPhaseController; } @@ -54,8 +53,8 @@ protected void executePhaseOnShard(ShardIterator shardIt, ShardRouting shard, Ac } @Override - protected SearchPhase getNextPhase(AtomicArray results, SearchPhaseContext context) { - return new DfsQueryPhase(results, searchPhaseController, + protected SearchPhase getNextPhase(SearchPhaseResults results, SearchPhaseContext context) { + return new DfsQueryPhase(results.results, searchPhaseController, (queryResults) -> new FetchSearchPhase(queryResults, searchPhaseController, context), context); } } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseContext.java b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseContext.java index 6786e60fd616e..1a21eb3cc3468 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseContext.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseContext.java @@ -114,4 +114,5 @@ default void sendReleaseSearchContext(long contextId, Transport.Connection conne * a response is returned to the user indicating that all shards have failed. */ void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPhase); + } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index 5193fe7278417..fe39e74142612 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -44,6 +44,7 @@ import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.dfs.AggregatedDfs; import org.elasticsearch.search.dfs.DfsSearchResult; import org.elasticsearch.search.fetch.FetchSearchResult; @@ -461,23 +462,50 @@ private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFr /** * Reduces the given query results and consumes all aggregations and profile results. + * @param queryResults a list of non-null query shard results + */ + public final ReducedQueryPhase reducedQueryPhase(List> queryResults) { + return reducedQueryPhase(queryResults, null); + } + + /** + * Reduces the given query results and consumes all aggregations and profile results. + * @param queryResults a list of non-null query shard results + * @param bufferdAggs a list of pre-collected / buffered aggregations. if this list is non-null all aggregations have been consumed + * from all non-null query results. * @see QuerySearchResult#consumeAggs() * @see QuerySearchResult#consumeProfileResult() */ - public final ReducedQueryPhase reducedQueryPhase(List> queryResults) { + private ReducedQueryPhase reducedQueryPhase(List> queryResults, + List bufferdAggs) { long totalHits = 0; long fetchHits = 0; float maxScore = Float.NEGATIVE_INFINITY; boolean timedOut = false; Boolean terminatedEarly = null; - if (queryResults.isEmpty()) { + if (queryResults.isEmpty()) { // early terminate we have nothing to reduce return new ReducedQueryPhase(totalHits, fetchHits, maxScore, timedOut, terminatedEarly, null, null, null, null); } - QuerySearchResult firstResult = queryResults.get(0).value.queryResult(); + final QuerySearchResult firstResult = queryResults.get(0).value.queryResult(); final boolean hasSuggest = firstResult.suggest() != null; - final boolean hasAggs = firstResult.hasAggs(); final boolean hasProfileResults = firstResult.hasProfileResults(); - final List aggregationsList = hasAggs ? new ArrayList<>(queryResults.size()) : Collections.emptyList(); + final boolean consumeAggs; + final List aggregationsList; + if (bufferdAggs != null) { + consumeAggs = false; + // we already have results from intermediate reduces and just need to perform the final reduce + assert firstResult.hasAggs() : "firstResult has no aggs but we got non null buffered aggs?"; + aggregationsList = bufferdAggs; + } else if (firstResult.hasAggs()) { + // the number of shards was less than the buffer size so we reduce agg results directly + aggregationsList = new ArrayList<>(queryResults.size()); + consumeAggs = true; + } else { + // no aggregations + aggregationsList = Collections.emptyList(); + consumeAggs = false; + } + // count the total (we use the query result provider here, since we might not get any hits (we scrolled past them)) final Map> groupedSuggestions = hasSuggest ? new HashMap<>() : Collections.emptyMap(); final Map profileResults = hasProfileResults ? new HashMap<>(queryResults.size()) @@ -506,7 +534,7 @@ public final ReducedQueryPhase reducedQueryPhase(List aggregationsList) { + ReduceContext reduceContext = new ReduceContext(bigArrays, scriptService, false); + return aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList, + null, reduceContext); + } + private InternalAggregations reduceAggs(List aggregationsList, - List pipelineAggregators) { - ReduceContext reduceContext = new ReduceContext(bigArrays, scriptService); + List pipelineAggregators, ReduceContext reduceContext) { InternalAggregations aggregations = InternalAggregations.reduce(aggregationsList, reduceContext); if (pipelineAggregators != null) { List newAggs = StreamSupport.stream(aggregations.spliterator(), false) @@ -593,4 +632,91 @@ public boolean isEmpty() { } } + /** + * A {@link org.elasticsearch.action.search.InitialSearchPhase.SearchPhaseResults} implementation + * that incrementally reduces aggregation results as shard results are consumed. + * This implementation can be configured to batch up a certain amount of results and only reduce them + * iff the buffer is exhausted. + */ + static final class QueryPhaseResultConsumer + extends InitialSearchPhase.SearchPhaseResults { + private final InternalAggregations[] buffer; + private int index; + private final SearchPhaseController controller; + + /** + * Creates a new {@link QueryPhaseResultConsumer} + * @param controller a controller instance to reduce the query response objects + * @param expectedResultSize the expected number of query results. Corresponds to the number of shards queried + * @param bufferSize the size of the reduce buffer. if the buffer size is smaller than the number of expected results + * the buffer is used to incrementally reduce aggregation results before all shards responded. + */ + private QueryPhaseResultConsumer(SearchPhaseController controller, int expectedResultSize, int bufferSize) { + super(expectedResultSize); + if (expectedResultSize != 1 && bufferSize < 2) { + throw new IllegalArgumentException("buffer size must be >= 2 if there is more than one expected result"); + } + if (expectedResultSize <= bufferSize) { + throw new IllegalArgumentException("buffer size must be less than the expected result size"); + } + this.controller = controller; + // no need to buffer anything if we have less expected results. in this case we don't consume any results ahead of time. + this.buffer = new InternalAggregations[bufferSize]; + } + + @Override + public void consumeResult(int shardIndex, QuerySearchResultProvider result) { + super.consumeResult(shardIndex, result); + QuerySearchResult queryResult = result.queryResult(); + assert queryResult.hasAggs() : "this collector should only be used if aggs are requested"; + consumeInternal(queryResult); + } + + private synchronized void consumeInternal(QuerySearchResult querySearchResult) { + InternalAggregations aggregations = (InternalAggregations) querySearchResult.consumeAggs(); + if (index == buffer.length) { + InternalAggregations reducedAggs = controller.reduceAggsIncrementally(Arrays.asList(buffer)); + Arrays.fill(buffer, null); + buffer[0] = reducedAggs; + index = 1; + } + final int i = index++; + buffer[i] = aggregations; + } + + private synchronized List getRemaining() { + return Arrays.asList(buffer).subList(0, index); + } + + @Override + public ReducedQueryPhase reduce() { + return controller.reducedQueryPhase(results.asList(), getRemaining()); + } + + /** + * Returns the number of buffered results + */ + int getNumBuffered() { + return index; + } + } + + /** + * Returns a new SearchPhaseResults instance. This might return an instance that reduces search responses incrementally. + */ + InitialSearchPhase.SearchPhaseResults newSearchPhaseResults(SearchRequest request, int numShards) { + SearchSourceBuilder source = request.source(); + if (source != null && source.aggregations() != null) { + if (request.getReduceUpTo() < numShards) { + // only use this if there are aggs and if there are more shards than we should reduce at once + return new QueryPhaseResultConsumer(this, numShards, request.getReduceUpTo()); + } + } + return new InitialSearchPhase.SearchPhaseResults(numShards) { + @Override + public ReducedQueryPhase reduce() { + return reducedQueryPhase(results.asList()); + } + }; + } } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 8d4edfeb79f79..210a9aefda755 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -24,7 +24,6 @@ import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.transport.Transport; @@ -44,17 +43,19 @@ final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction results, SearchPhaseContext context) { + protected SearchPhase getNextPhase(SearchPhaseResults results, SearchPhaseContext context) { return new FetchSearchPhase(results, searchPhaseController, context); } } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java index 9c69f1a763f38..0c1189d1d694e 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.search; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.IndicesRequest; @@ -70,6 +71,8 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest private Scroll scroll; + private int reduceUpTo = 512; + private String[] types = Strings.EMPTY_ARRAY; public static final IndicesOptions DEFAULT_INDICES_OPTIONS = IndicesOptions.strictExpandOpenAndForbidClosed(); @@ -274,6 +277,25 @@ public Boolean requestCache() { return this.requestCache; } + /** + * Sets the number of shard results that should be reduced at once on the coordinating node. This value should be used as a protection + * mechanism to reduce the memory overhead per search request if the potential number of shards in the request can be large. + */ + public void setReduceUpTo(int reduceUpTo) { + if (reduceUpTo <= 1) { + throw new IllegalArgumentException("reduceUpTo must be >= 2"); + } + this.reduceUpTo = reduceUpTo; + } + + /** + * Returns the number of shard results that should be reduced at once on the coordinating node. This value should be used as a + * protection mechanism to reduce the memory overhead per search request if the potential number of shards in the request can be large. + */ + public int getReduceUpTo() { + return reduceUpTo; + } + /** * @return true if the request only has suggest */ @@ -320,6 +342,9 @@ public void readFrom(StreamInput in) throws IOException { types = in.readStringArray(); indicesOptions = IndicesOptions.readIndicesOptions(in); requestCache = in.readOptionalBoolean(); + if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + reduceUpTo = in.readVInt(); + } } @Override @@ -337,6 +362,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeStringArray(types); indicesOptions.writeIndicesOptions(out); out.writeOptionalBoolean(requestCache); + if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + out.writeVInt(reduceUpTo); + } } @Override diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java index 865cf01430fb0..c6c5e0fbf3d60 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java @@ -523,4 +523,9 @@ private SearchSourceBuilder sourceBuilder() { } return request.source(); } + + public SearchRequestBuilder setReduceUpTo(int reduceUpTo) { + this.request.setReduceUpTo(reduceUpTo); + return this; + } } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java b/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java index 6af896426a798..563a958109be2 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java @@ -47,10 +47,21 @@ public static class ReduceContext { private final BigArrays bigArrays; private final ScriptService scriptService; + private final boolean isFinalReduce; - public ReduceContext(BigArrays bigArrays, ScriptService scriptService) { + public ReduceContext(BigArrays bigArrays, ScriptService scriptService, boolean isFinalReduce) { this.bigArrays = bigArrays; this.scriptService = scriptService; + this.isFinalReduce = isFinalReduce; + } + + /** + * Returns true iff the current reduce phase is the final reduce phase. This indicates if operations like + * pipeline aggregations should be applied or if specific features like minDocCount should be taken into account. + * Operations that are potentially loosing information can only be applied during the final reduce phase. + */ + public boolean isFinalReduce() { + return isFinalReduce; } public BigArrays bigArrays() { @@ -111,8 +122,10 @@ public String getName() { */ public final InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { InternalAggregation aggResult = doReduce(aggregations, reduceContext); - for (PipelineAggregator pipelineAggregator : pipelineAggregators) { - aggResult = pipelineAggregator.reduce(aggResult, reduceContext); + if (reduceContext.isFinalReduce()) { + for (PipelineAggregator pipelineAggregator : pipelineAggregators) { + aggResult = pipelineAggregator.reduce(aggResult, reduceContext); + } } return aggResult; } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGrid.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGrid.java index 085f18c0e1e78..ef268f8a5049c 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGrid.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoHashGrid.java @@ -192,7 +192,7 @@ public InternalGeoHashGrid doReduce(List aggregations, Redu } } - final int size = (int) Math.min(requiredSize, buckets.size()); + final int size = Math.toIntExact(reduceContext.isFinalReduce() == false ? buckets.size() : Math.min(requiredSize, buckets.size())); BucketPriorityQueue ordered = new BucketPriorityQueue(size); for (LongObjectPagedHashMap.Cursor> cursor : buckets) { List sameCellBuckets = cursor.value; diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java index f24fc5c127ec6..a8976aaa1ac77 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java @@ -285,7 +285,7 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { if (top.current.key != key) { // the key changes, reduce what we already buffered and reset the buffer for current buckets final Bucket reduced = currentBuckets.get(0).reduce(currentBuckets, reduceContext); - if (reduced.getDocCount() >= minDocCount) { + if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) { reducedBuckets.add(reduced); } currentBuckets.clear(); @@ -306,7 +306,7 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { if (currentBuckets.isEmpty() == false) { final Bucket reduced = currentBuckets.get(0).reduce(currentBuckets, reduceContext); - if (reduced.getDocCount() >= minDocCount) { + if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) { reducedBuckets.add(reduced); } } @@ -382,7 +382,7 @@ public InternalAggregation doReduce(List aggregations, Redu addEmptyBuckets(reducedBuckets, reduceContext); } - if (order == InternalOrder.KEY_ASC) { + if (order == InternalOrder.KEY_ASC || reduceContext.isFinalReduce() == false) { // nothing to do, data are already sorted since shards return // sorted buckets and the merge-sort performed by reduceBuckets // maintains order diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java index eb90dfae732ad..e6e23d3a615a1 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java @@ -308,7 +308,7 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { if (top.current.key != key) { // the key changes, reduce what we already buffered and reset the buffer for current buckets final Bucket reduced = currentBuckets.get(0).reduce(currentBuckets, reduceContext); - if (reduced.getDocCount() >= minDocCount) { + if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) { reducedBuckets.add(reduced); } currentBuckets.clear(); @@ -329,7 +329,7 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) { if (currentBuckets.isEmpty() == false) { final Bucket reduced = currentBuckets.get(0).reduce(currentBuckets, reduceContext); - if (reduced.getDocCount() >= minDocCount) { + if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) { reducedBuckets.add(reduced); } } @@ -400,7 +400,7 @@ public InternalAggregation doReduce(List aggregations, Redu addEmptyBuckets(reducedBuckets, reduceContext); } - if (order == InternalOrder.KEY_ASC) { + if (order == InternalOrder.KEY_ASC || reduceContext.isFinalReduce() == false) { // nothing to do, data are already sorted since shards return // sorted buckets and the merge-sort performed by reduceBuckets // maintains order diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/InternalSignificantTerms.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/InternalSignificantTerms.java index cdd1f8d19a7de..6fcee8e937e53 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/InternalSignificantTerms.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/significant/InternalSignificantTerms.java @@ -196,15 +196,14 @@ public InternalAggregation doReduce(List aggregations, Redu bucket.aggregations)); } } - SignificanceHeuristic heuristic = getSignificanceHeuristic().rewrite(reduceContext); - final int size = Math.min(requiredSize, buckets.size()); + final int size = reduceContext.isFinalReduce() == false ? buckets.size() : Math.min(requiredSize, buckets.size()); BucketSignificancePriorityQueue ordered = new BucketSignificancePriorityQueue<>(size); for (Map.Entry> entry : buckets.entrySet()) { List sameTermBuckets = entry.getValue(); final B b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext); b.updateScore(heuristic); - if ((b.score > 0) && (b.subsetDf >= minDocCount)) { + if (((b.score > 0) && (b.subsetDf >= minDocCount)) || reduceContext.isFinalReduce() == false) { ordered.insertWithOverflow(b); } } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java index 961e0a9228066..938b20d9fc892 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java @@ -33,6 +33,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -248,8 +249,8 @@ public InternalAggregation doReduce(List aggregations, Redu } } - final int size = Math.min(requiredSize, buckets.size()); - BucketPriorityQueue ordered = new BucketPriorityQueue<>(size, order.comparator(null)); + final int size = reduceContext.isFinalReduce() == false ? buckets.size() : Math.min(requiredSize, buckets.size()); + final BucketPriorityQueue ordered = new BucketPriorityQueue<>(size, order.comparator(null)); for (List sameTermBuckets : buckets.values()) { final B b = sameTermBuckets.get(0).reduce(sameTermBuckets, reduceContext); if (b.docCountError != -1) { @@ -259,7 +260,7 @@ public InternalAggregation doReduce(List aggregations, Redu b.docCountError = sumDocCountError - b.docCountError; } } - if (b.docCount >= minDocCount) { + if (b.docCount >= minDocCount || reduceContext.isFinalReduce() == false) { B removed = ordered.insertWithOverflow(b); if (removed != null) { otherDocCount += removed.getDocCount(); diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/scripted/InternalScriptedMetric.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/scripted/InternalScriptedMetric.java index 6cb3b626f9185..174ff8ca28dc1 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/scripted/InternalScriptedMetric.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/scripted/InternalScriptedMetric.java @@ -39,9 +39,14 @@ public class InternalScriptedMetric extends InternalMetricsAggregation implements ScriptedMetric { private final Script reduceScript; - private final Object aggregation; + private final List aggregation; public InternalScriptedMetric(String name, Object aggregation, Script reduceScript, List pipelineAggregators, + Map metaData) { + this(name, Collections.singletonList(aggregation), reduceScript, pipelineAggregators, metaData); + } + + private InternalScriptedMetric(String name, List aggregation, Script reduceScript, List pipelineAggregators, Map metaData) { super(name, pipelineAggregators, metaData); this.aggregation = aggregation; @@ -54,13 +59,13 @@ public InternalScriptedMetric(String name, Object aggregation, Script reduceScri public InternalScriptedMetric(StreamInput in) throws IOException { super(in); reduceScript = in.readOptionalWriteable(Script::new); - aggregation = in.readGenericValue(); + aggregation = Collections.singletonList(in.readGenericValue()); } @Override protected void doWriteTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(reduceScript); - out.writeGenericValue(aggregation); + out.writeGenericValue(aggregation()); } @Override @@ -70,7 +75,10 @@ public String getWriteableName() { @Override public Object aggregation() { - return aggregation; + if (aggregation.size() != 1) { + throw new IllegalStateException("aggregation was not reduced"); + } + return aggregation.get(0); } @Override @@ -78,11 +86,11 @@ public InternalAggregation doReduce(List aggregations, Redu List aggregationObjects = new ArrayList<>(); for (InternalAggregation aggregation : aggregations) { InternalScriptedMetric mapReduceAggregation = (InternalScriptedMetric) aggregation; - aggregationObjects.add(mapReduceAggregation.aggregation()); + aggregationObjects.addAll(mapReduceAggregation.aggregation); } InternalScriptedMetric firstAggregation = ((InternalScriptedMetric) aggregations.get(0)); - Object aggregation; - if (firstAggregation.reduceScript != null) { + List aggregation; + if (firstAggregation.reduceScript != null && reduceContext.isFinalReduce()) { Map vars = new HashMap<>(); vars.put("_aggs", aggregationObjects); if (firstAggregation.reduceScript.getParams() != null) { @@ -91,13 +99,16 @@ public InternalAggregation doReduce(List aggregations, Redu CompiledScript compiledScript = reduceContext.scriptService().compile( firstAggregation.reduceScript, ScriptContext.Standard.AGGS); ExecutableScript script = reduceContext.scriptService().executable(compiledScript, vars); - aggregation = script.run(); + aggregation = Collections.singletonList(script.run()); + } else if (reduceContext.isFinalReduce()) { + aggregation = Collections.singletonList(aggregationObjects); } else { + // if we are not an final reduce we have to maintain all the aggs from all the incoming one + // until we hit the final reduce phase. aggregation = aggregationObjects; } return new InternalScriptedMetric(firstAggregation.getName(), aggregation, firstAggregation.reduceScript, pipelineAggregators(), getMetaData()); - } @Override @@ -105,7 +116,7 @@ public Object getProperty(List path) { if (path.isEmpty()) { return this; } else if (path.size() == 1 && "value".equals(path.get(0))) { - return aggregation; + return aggregation(); } else { throw new IllegalArgumentException("path not supported for [" + getName() + "]: " + path); } @@ -113,7 +124,7 @@ public Object getProperty(List path) { @Override public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { - return builder.field("value", aggregation); + return builder.field("value", aggregation()); } } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/tophits/InternalTopHits.java b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/tophits/InternalTopHits.java index baa8c45e140ca..81216af23e7f9 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/metrics/tophits/InternalTopHits.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/metrics/tophits/InternalTopHits.java @@ -96,7 +96,18 @@ int getSize() { @Override public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { - SearchHits[] shardHits = new SearchHits[aggregations.size()]; + final SearchHits[] shardHits = new SearchHits[aggregations.size()]; + final int from; + final int size; + if (reduceContext.isFinalReduce()) { + from = this.from; + size = this.size; + } else { + // if we are not in the final reduce we need to ensure we maintain all possible elements during reduce + // hence for pagination we need to maintain all hits until we are in the final phase. + from = 0; + size = this.from + this.size; + } final TopDocs reducedTopDocs; final TopDocs[] shardDocs; @@ -130,7 +141,7 @@ public InternalAggregation doReduce(List aggregations, Redu } while (shardDocs[scoreDoc.shardIndex].scoreDocs[position] != scoreDoc); hits[i] = shardHits[scoreDoc.shardIndex].getAt(position); } - return new InternalTopHits(name, from, size, reducedTopDocs, new SearchHits(hits, reducedTopDocs.totalHits, + return new InternalTopHits(name, this.from, this.size, reducedTopDocs, new SearchHits(hits, reducedTopDocs.totalHits, reducedTopDocs.getMaxScore()), pipelineAggregators(), getMetaData()); } diff --git a/core/src/test/java/org/elasticsearch/action/search/CountedCollectorTests.java b/core/src/test/java/org/elasticsearch/action/search/CountedCollectorTests.java index d34bf180e806c..6995ad93f25fd 100644 --- a/core/src/test/java/org/elasticsearch/action/search/CountedCollectorTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/CountedCollectorTests.java @@ -46,7 +46,7 @@ public void testCollect() throws InterruptedException { runnable.run(); } }; - CountedCollector collector = new CountedCollector<>(results, numResultsExpected, + CountedCollector collector = new CountedCollector<>(results::set, numResultsExpected, latch::countDown, context); for (int i = 0; i < numResultsExpected; i++) { int shardID = i; diff --git a/core/src/test/java/org/elasticsearch/action/search/DfsQueryPhaseTests.java b/core/src/test/java/org/elasticsearch/action/search/DfsQueryPhaseTests.java index f094db086f84e..ba01559e0f063 100644 --- a/core/src/test/java/org/elasticsearch/action/search/DfsQueryPhaseTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/DfsQueryPhaseTests.java @@ -95,7 +95,7 @@ public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest (response) -> new SearchPhase("test") { @Override public void run() throws IOException { - responseRef.set(response); + responseRef.set(response.results); } }, mockSearchPhaseContext); assertEquals("dfs_query", phase.getName()); @@ -147,7 +147,7 @@ public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest (response) -> new SearchPhase("test") { @Override public void run() throws IOException { - responseRef.set(response); + responseRef.set(response.results); } }, mockSearchPhaseContext); assertEquals("dfs_query", phase.getName()); @@ -202,7 +202,7 @@ public void sendExecuteQuery(Transport.Connection connection, QuerySearchRequest (response) -> new SearchPhase("test") { @Override public void run() throws IOException { - responseRef.set(response); + responseRef.set(response.results); } }, mockSearchPhaseContext); assertEquals("dfs_query", phase.getName()); diff --git a/core/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java b/core/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java index 67a67f720e14a..14c2eb6f63fd2 100644 --- a/core/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java @@ -46,7 +46,10 @@ public class FetchSearchPhaseTests extends ESTestCase { public void testShortcutQueryAndFetchOptimization() throws IOException { - AtomicArray results = new AtomicArray<>(1); + SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null); + MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(1); + InitialSearchPhase.SearchPhaseResults results = + controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 1); AtomicReference responseRef = new AtomicReference<>(); boolean hasHits = randomBoolean(); final int numHits; @@ -56,14 +59,12 @@ public void testShortcutQueryAndFetchOptimization() throws IOException { queryResult.size(1); FetchSearchResult fetchResult = new FetchSearchResult(); fetchResult.hits(new SearchHits(new SearchHit[] {new SearchHit(42)}, 1, 1.0F)); - results.set(0, new QueryFetchSearchResult(queryResult, fetchResult)); + results.consumeResult(0, new QueryFetchSearchResult(queryResult, fetchResult)); numHits = 1; } else { numHits = 0; } - SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null); - MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(1); FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext, (searchResponse) -> new SearchPhase("test") { @Override @@ -83,20 +84,22 @@ public void run() throws IOException { } public void testFetchTwoDocument() throws IOException { - AtomicArray results = new AtomicArray<>(2); + MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2); + SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null); + InitialSearchPhase.SearchPhaseResults results = + controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2); AtomicReference responseRef = new AtomicReference<>(); int resultSetSize = randomIntBetween(2, 10); QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0)); queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]); queryResult.size(resultSetSize); // the size of the result set - results.set(0, queryResult); + results.consumeResult(0, queryResult); queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new Index("test", "na"), 1)); queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(84, 2.0F)}, 2.0F), new DocValueFormat[0]); queryResult.size(resultSetSize); - results.set(1, queryResult); + results.consumeResult(1, queryResult); - SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null); SearchTransportService searchTransportService = new SearchTransportService( Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override @@ -112,7 +115,6 @@ public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRe listener.onResponse(fetchResult); } }; - MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2); mockSearchPhaseContext.searchTransport = searchTransportService; FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext, (searchResponse) -> new SearchPhase("test") { @@ -134,20 +136,22 @@ public void run() throws IOException { } public void testFailFetchOneDoc() throws IOException { - AtomicArray results = new AtomicArray<>(2); + MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2); + SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null); + InitialSearchPhase.SearchPhaseResults results = + controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2); AtomicReference responseRef = new AtomicReference<>(); int resultSetSize = randomIntBetween(2, 10); QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0)); queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]); queryResult.size(resultSetSize); // the size of the result set - results.set(0, queryResult); + results.consumeResult(0, queryResult); queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new Index("test", "na"), 1)); queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(84, 2.0F)}, 2.0F), new DocValueFormat[0]); queryResult.size(resultSetSize); - results.set(1, queryResult); + results.consumeResult(1, queryResult); - SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null); SearchTransportService searchTransportService = new SearchTransportService( Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override @@ -163,7 +167,6 @@ public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRe } }; - MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2); mockSearchPhaseContext.searchTransport = searchTransportService; FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext, (searchResponse) -> new SearchPhase("test") { @@ -190,15 +193,17 @@ public void testFetchDocsConcurrently() throws IOException, InterruptedException int resultSetSize = randomIntBetween(0, 100); // we use at least 2 hits otherwise this is subject to single shard optimization and we trip an assert... int numHits = randomIntBetween(2, 100); // also numshards --> 1 hit per shard - AtomicArray results = new AtomicArray<>(numHits); + SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null); + MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(numHits); + InitialSearchPhase.SearchPhaseResults results = + controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), numHits); AtomicReference responseRef = new AtomicReference<>(); for (int i = 0; i < numHits; i++) { QuerySearchResult queryResult = new QuerySearchResult(i, new SearchShardTarget("node1", new Index("test", "na"), 0)); queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(i+1, i)}, i), new DocValueFormat[0]); queryResult.size(resultSetSize); // the size of the result set - results.set(i, queryResult); + results.consumeResult(i, queryResult); } - SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null); SearchTransportService searchTransportService = new SearchTransportService( Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override @@ -211,7 +216,6 @@ public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRe }).start(); } }; - MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(numHits); mockSearchPhaseContext.searchTransport = searchTransportService; CountDownLatch latch = new CountDownLatch(1); FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext, @@ -243,20 +247,22 @@ public void run() throws IOException { } public void testExceptionFailsPhase() throws IOException { - AtomicArray results = new AtomicArray<>(2); + MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2); + SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null); + InitialSearchPhase.SearchPhaseResults results = + controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2); AtomicReference responseRef = new AtomicReference<>(); int resultSetSize = randomIntBetween(2, 10); QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0)); queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]); queryResult.size(resultSetSize); // the size of the result set - results.set(0, queryResult); + results.consumeResult(0, queryResult); queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new Index("test", "na"), 1)); queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(84, 2.0F)}, 2.0F), new DocValueFormat[0]); queryResult.size(resultSetSize); - results.set(1, queryResult); + results.consumeResult(1, queryResult); AtomicInteger numFetches = new AtomicInteger(0); - SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null); SearchTransportService searchTransportService = new SearchTransportService( Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override @@ -275,7 +281,6 @@ public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRe listener.onResponse(fetchResult); } }; - MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2); mockSearchPhaseContext.searchTransport = searchTransportService; FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext, (searchResponse) -> new SearchPhase("test") { @@ -293,20 +298,22 @@ public void run() throws IOException { } public void testCleanupIrrelevantContexts() throws IOException { // contexts that are not fetched should be cleaned up - AtomicArray results = new AtomicArray<>(2); + MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2); + SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null); + InitialSearchPhase.SearchPhaseResults results = + controller.newSearchPhaseResults(mockSearchPhaseContext.getRequest(), 2); AtomicReference responseRef = new AtomicReference<>(); int resultSetSize = 1; QuerySearchResult queryResult = new QuerySearchResult(123, new SearchShardTarget("node1", new Index("test", "na"), 0)); queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(42, 1.0F)}, 2.0F), new DocValueFormat[0]); queryResult.size(resultSetSize); // the size of the result set - results.set(0, queryResult); + results.consumeResult(0, queryResult); queryResult = new QuerySearchResult(321, new SearchShardTarget("node2", new Index("test", "na"), 1)); queryResult.topDocs(new TopDocs(1, new ScoreDoc[] {new ScoreDoc(84, 2.0F)}, 2.0F), new DocValueFormat[0]); queryResult.size(resultSetSize); - results.set(1, queryResult); + results.consumeResult(1, queryResult); - SearchPhaseController controller = new SearchPhaseController(Settings.EMPTY, BigArrays.NON_RECYCLING_INSTANCE, null); SearchTransportService searchTransportService = new SearchTransportService( Settings.builder().put("search.remote.connect", false).build(), null, null) { @Override @@ -321,7 +328,6 @@ public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRe listener.onResponse(fetchResult); } }; - MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(2); mockSearchPhaseContext.searchTransport = searchTransportService; FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext, (searchResponse) -> new SearchPhase("test") { diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index 972d0957dce7c..9b7fad265bfb3 100644 --- a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -32,13 +32,11 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.AliasFilter; -import org.elasticsearch.search.internal.ShardSearchTransportRequest; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportException; @@ -53,7 +51,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -96,7 +93,8 @@ public void sendFreeContext(Transport.Connection connection, long contextId, Sea lookup.put(replicaNode.getId(), new MockConnection(replicaNode)); Map aliasFilters = Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)); AbstractSearchAsyncAction asyncAction = new AbstractSearchAsyncAction("test", logger, transportService, - lookup::get, aliasFilters, Collections.emptyMap(), null, request, responseListener, shardsIter, 0, 0, null) { + lookup::get, aliasFilters, Collections.emptyMap(), null, request, responseListener, shardsIter, 0, 0, null, + new InitialSearchPhase.SearchPhaseResults<>(shardsIter.size())) { TestSearchResponse response = new TestSearchResponse(); @Override @@ -115,12 +113,12 @@ protected void executePhaseOnShard(ShardIterator shardIt, ShardRouting shard, Ac } @Override - protected SearchPhase getNextPhase(AtomicArray results, SearchPhaseContext context) { + protected SearchPhase getNextPhase(SearchPhaseResults results, SearchPhaseContext context) { return new SearchPhase("test") { @Override public void run() throws IOException { - for (int i = 0; i < results.length(); i++) { - TestSearchPhaseResult result = results.get(i); + for (int i = 0; i < results.getNumShards(); i++) { + TestSearchPhaseResult result = results.results.get(i); assertEquals(result.node.getId(), result.shardTarget().getNodeId()); sendReleaseSearchContext(result.id(), new MockConnection(result.node)); } diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java index 1686a3c6de27d..21d5b6aee9063 100644 --- a/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java @@ -26,7 +26,14 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.index.Index; +import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.metrics.max.InternalMax; +import org.elasticsearch.search.aggregations.metrics.sum.InternalSum; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; @@ -45,10 +52,13 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.not; public class SearchPhaseControllerTests extends ESTestCase { private SearchPhaseController searchPhaseController; @@ -230,4 +240,98 @@ private AtomicArray generateFetchResults(int nShards, } return fetchResults; } + + public void testConsumer() { + int bufferSize = randomIntBetween(2, 3); + SearchRequest request = new SearchRequest(); + request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo"))); + request.setReduceUpTo(bufferSize); + InitialSearchPhase.SearchPhaseResults consumer = searchPhaseController.newSearchPhaseResults(request, 3); + QuerySearchResult result = new QuerySearchResult(0, new SearchShardTarget("node", new Index("a", "b"), 0)); + result.topDocs(new TopDocs(0, new ScoreDoc[0], 0.0F), new DocValueFormat[0]); + InternalAggregations aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", 1.0D, DocValueFormat.RAW, + Collections.emptyList(), Collections.emptyMap()))); + result.aggregations(aggs); + consumer.consumeResult(0, result); + + result = new QuerySearchResult(1, new SearchShardTarget("node", new Index("a", "b"), 0)); + result.topDocs(new TopDocs(0, new ScoreDoc[0], 0.0F), new DocValueFormat[0]); + aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", 3.0D, DocValueFormat.RAW, + Collections.emptyList(), Collections.emptyMap()))); + result.aggregations(aggs); + consumer.consumeResult(2, result); + + result = new QuerySearchResult(1, new SearchShardTarget("node", new Index("a", "b"), 0)); + result.topDocs(new TopDocs(0, new ScoreDoc[0], 0.0F), new DocValueFormat[0]); + aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", 2.0D, DocValueFormat.RAW, + Collections.emptyList(), Collections.emptyMap()))); + result.aggregations(aggs); + consumer.consumeResult(1, result); + + if (bufferSize == 2) { + assertThat(consumer, instanceOf(SearchPhaseController.QueryPhaseResultConsumer.class)); + assertEquals(2, ((SearchPhaseController.QueryPhaseResultConsumer)consumer).getNumBuffered()); + } else { + assertThat(consumer, not(instanceOf(SearchPhaseController.QueryPhaseResultConsumer.class))); + } + + SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce(); + InternalMax max = (InternalMax) reduce.aggregations.asList().get(0); + assertEquals(3.0D, max.getValue(), 0.0D); + } + + public void testConsumerConcurrently() throws InterruptedException { + int expectedNumResults = randomIntBetween(1, 100); + int bufferSize = randomIntBetween(2, 200); + + SearchRequest request = new SearchRequest(); + request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo"))); + request.setReduceUpTo(bufferSize); + InitialSearchPhase.SearchPhaseResults consumer = + searchPhaseController.newSearchPhaseResults(request, expectedNumResults); + AtomicInteger max = new AtomicInteger(); + CountDownLatch latch = new CountDownLatch(expectedNumResults); + for (int i = 0; i < expectedNumResults; i++) { + int id = i; + Thread t = new Thread(() -> { + int number = randomIntBetween(1, 1000); + max.updateAndGet(prev -> Math.max(prev, number)); + QuerySearchResult result = new QuerySearchResult(id, new SearchShardTarget("node", new Index("a", "b"), id)); + result.topDocs(new TopDocs(id, new ScoreDoc[0], 0.0F), new DocValueFormat[0]); + InternalAggregations aggs = new InternalAggregations(Arrays.asList(new InternalMax("test", (double) number, + DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap()))); + result.aggregations(aggs); + consumer.consumeResult(id, result); + latch.countDown(); + + }); + t.start(); + } + latch.await(); + SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce(); + InternalMax internalMax = (InternalMax) reduce.aggregations.asList().get(0); + assertEquals(max.get(), internalMax.getValue(), 0.0D); + } + + public void testNewSearchPhaseResults() { + for (int i = 0; i < 10; i++) { + int expectedNumResults = randomIntBetween(1, 10); + int bufferSize = randomIntBetween(2, 10); + SearchRequest request = new SearchRequest(); + final boolean hasAggs; + if ((hasAggs = randomBoolean())) { + request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo"))); + } + request.setReduceUpTo(bufferSize); + InitialSearchPhase.SearchPhaseResults consumer + = searchPhaseController.newSearchPhaseResults(request, expectedNumResults); + if (hasAggs && expectedNumResults > bufferSize) { + assertThat("expectedNumResults: " + expectedNumResults + " bufferSize: " + bufferSize, + consumer, instanceOf(SearchPhaseController.QueryPhaseResultConsumer.class)); + } else { + assertThat("expectedNumResults: " + expectedNumResults + " bufferSize: " + bufferSize, + consumer, not(instanceOf(SearchPhaseController.QueryPhaseResultConsumer.class))); + } + } + } } diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/core/src/test/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index aab44c32fcf0d..296a2ec461114 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -180,8 +180,18 @@ protected A searchAndReduc if (aggs.isEmpty()) { return null; } else { + if (aggs.size() > 2 && randomBoolean()) { + // sometimes do an incremental reduce + List internalAggregations = randomSubsetOf(randomIntBetween(2, aggs.size()-1), aggs); + A internalAgg = (A) aggs.get(0).doReduce(internalAggregations, + new InternalAggregation.ReduceContext(root.context().bigArrays(), null, false)); + aggs.removeAll(internalAggregations); + aggs.add(internalAgg); + } + // now do the final reduce @SuppressWarnings("unchecked") - A internalAgg = (A) aggs.get(0).doReduce(aggs, new InternalAggregation.ReduceContext(root.context().bigArrays(), null)); + A internalAgg = (A) aggs.get(0).doReduce(aggs, new InternalAggregation.ReduceContext(root.context().bigArrays(), null, + true)); return internalAgg; } } finally { diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationTestCase.java b/core/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationTestCase.java index 9ea06f3086fac..f7d105ac8dab5 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationTestCase.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/InternalAggregationTestCase.java @@ -57,8 +57,17 @@ public final void testReduceRandom() { inputs.add(t); toReduce.add(t); } + if (randomBoolean() && toReduceSize >= 2) { + List internalAggregations = randomSubsetOf(randomIntBetween(2, toReduceSize - 2), toReduce); + InternalAggregation.ReduceContext context = new InternalAggregation.ReduceContext(null, null, true); + @SuppressWarnings("unchecked") + T reduced = (T) inputs.get(0).reduce(internalAggregations, context); + toReduce.removeAll(internalAggregations); + toReduce.add(reduced); + } + InternalAggregation.ReduceContext context = new InternalAggregation.ReduceContext(null, null, true); @SuppressWarnings("unchecked") - T reduced = (T) inputs.get(0).reduce(toReduce, null); + T reduced = (T) inputs.get(0).reduce(toReduce, context); assertReduced(reduced, inputs); } diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/MinDocCountIT.java b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/MinDocCountIT.java index 925ff86232a76..7307b756e3f1f 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/MinDocCountIT.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/MinDocCountIT.java @@ -170,9 +170,9 @@ private void assertSubset(Terms terms1, Terms terms2, long minDocCount, int size if (size2++ == size) { break; } - assertTrue(it2.hasNext()); + assertTrue("minDocCount: " + minDocCount, it2.hasNext()); final Terms.Bucket bucket2 = it2.next(); - assertEquals(bucket1.getDocCount(), bucket2.getDocCount()); + assertEquals("minDocCount: " + minDocCount, bucket1.getDocCount(), bucket2.getDocCount()); } } assertFalse(it2.hasNext()); @@ -336,24 +336,8 @@ private void testMinDocCountOnTerms(String field, Script script, Terms.Order ord .shardSize(cardinality + randomInt(10)) .minDocCount(minDocCount)).request(); final SearchResponse response = client().search(request).get(); - try { - assertAllSuccessful(response); - assertSubset(allTerms, (Terms) response.getAggregations().get("terms"), minDocCount, size, include); - } catch (AssertionError ae) { - if (!retry) { - throw ae; - } - logger.info("test failed. trying to see if it recovers after 1m.", ae); - try { - Thread.sleep(60000); - logger.debug("1m passed. retrying."); - testMinDocCountOnTerms(field, script, order, include, false); - } catch (Exception secondFailure) { - secondFailure.addSuppressed(ae); - logger.error("exception on retry (will re-throw the original in a sec)", secondFailure); - } - throw ae; - } + assertAllSuccessful(response); + assertSubset(allTerms, (Terms) response.getAggregations().get("terms"), minDocCount, size, include); } } diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/TermsDocCountErrorIT.java b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/TermsDocCountErrorIT.java index f2a030ab5e8ad..a7173dc4c22b3 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/TermsDocCountErrorIT.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/TermsDocCountErrorIT.java @@ -20,7 +20,10 @@ package org.elasticsearch.search.aggregations.bucket; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.FilterClient; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; @@ -30,10 +33,12 @@ import org.elasticsearch.search.aggregations.bucket.terms.Terms.Order; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregatorFactory.ExecutionMode; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.client.RandomizingClient; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.function.Function; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.search.aggregations.AggregationBuilders.sum; @@ -53,12 +58,43 @@ public class TermsDocCountErrorIT extends ESIntegTestCase { private static final String LONG_FIELD_NAME = "l_value"; private static final String DOUBLE_FIELD_NAME = "d_value"; + public static String randomExecutionHint() { return randomBoolean() ? null : randomFrom(ExecutionMode.values()).toString(); } private static int numRoutingValues; + public static Client client() { + Client client = ESIntegTestCase.client(); + if (client instanceof RandomizingClient) { + return new FilterClient(client) { + /* this test doesn't work with multiple reduce phases since: + * the error for a term is the sum of the errors across all aggs that need to be reduced. + * if the term is in the aggregation, then we just use the associated error, but if it is not we need to use the + * aggregation-level error, ie. the maximum count that a doc that is not in the top list could have. + * + * the problem is that the logic we have today assumes there is a single reduce. So for instance for the agg-level error + * it takes the count of the last term. This is correct if the agg was produced on a shard: if it had a greater count + * then it would be in the top list. However if we are on an intermediate reduce, this does not work anymore. + * + * Another assumption that does not hold is that right now if a term is present in an agg, we assume its count is accurate. + * Again this is true if the agg was produced on a shard, but not if this is the result of an intermediate reduce. + * + * try with this seed and remove the setReduceUpTo below + * -Dtests.seed=B32081B1E8589AE5 -Dtests.class=org.elasticsearch.search.aggregations.bucket.TermsDocCountErrorIT + * -Dtests.method="testDoubleValueField" -Dtests.locale=lv -Dtests.timezone=WET + * This must will be addressed in a followup to #23253 + */ + @Override + public SearchRequestBuilder prepareSearch(String... indices) { + return this.in.prepareSearch(indices).setReduceUpTo(512); + } + }; + } + return client; + } + @Override public void setupSuiteScopeCluster() throws Exception { assertAcked(client().admin().indices().prepareCreate("idx") diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificanceHeuristicTests.java b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificanceHeuristicTests.java index e896e1cc1fb82..2dc208d89fb44 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificanceHeuristicTests.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/significant/SignificanceHeuristicTests.java @@ -146,7 +146,8 @@ SignificanceHeuristic getRandomSignificanceheuristic() { public void testReduce() { List aggs = createInternalAggregations(); - SignificantTerms reducedAgg = (SignificantTerms) aggs.get(0).doReduce(aggs, null); + InternalAggregation.ReduceContext context = new InternalAggregation.ReduceContext(null, null, true); + SignificantTerms reducedAgg = (SignificantTerms) aggs.get(0).doReduce(aggs, context); assertThat(reducedAgg.getBuckets().size(), equalTo(2)); assertThat(reducedAgg.getBuckets().get(0).getSubsetDf(), equalTo(8L)); assertThat(reducedAgg.getBuckets().get(0).getSubsetSize(), equalTo(16L)); @@ -264,7 +265,7 @@ protected void checkParseException(ParseFieldRegistry significanceHeuristicParserRegistry, String heuristicString) throws IOException { - XContentParser stParser = createParser(JsonXContent.jsonXContent, + XContentParser stParser = createParser(JsonXContent.jsonXContent, "{\"field\":\"text\", " + heuristicString + ", \"min_doc_count\":200}"); return parseSignificanceHeuristic(significanceHeuristicParserRegistry, stParser); } diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java index cb3165f2beda6..f2977fd769205 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java @@ -131,7 +131,7 @@ public void testMixLongAndDouble() throws Exception { } InternalAggregation.ReduceContext ctx = new InternalAggregation.ReduceContext(new MockBigArrays(Settings.EMPTY, - new NoneCircuitBreakerService()), null); + new NoneCircuitBreakerService()), null, true); for (InternalAggregation internalAgg : aggs) { InternalAggregation mergedAggs = internalAgg.doReduce(aggs, ctx); assertTrue(mergedAggs instanceof DoubleTerms); diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/metrics/TopHitsIT.java b/core/src/test/java/org/elasticsearch/search/aggregations/metrics/TopHitsIT.java index b2adee43a4baf..9ca7726b16445 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/metrics/TopHitsIT.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/metrics/TopHitsIT.java @@ -421,7 +421,7 @@ public void testBreadthFirstWithAggOrderAndScoreNeeded() throws Exception { assertThat(hits.getHits().length, equalTo(3)); assertThat(hits.getAt(0).getSourceAsMap().size(), equalTo(4)); - id --; + id--; } } @@ -452,7 +452,8 @@ public void testPagination() throws Exception { .executionHint(randomExecutionHint()) .field(TERMS_AGGS_FIELD) .subAggregation( - topHits("hits").sort(SortBuilders.fieldSort(SORT_FIELD).order(SortOrder.DESC)) + topHits("hits") + .sort(SortBuilders.fieldSort(SORT_FIELD).order(SortOrder.DESC)) .from(from) .size(size) ) @@ -483,7 +484,8 @@ public void testPagination() throws Exception { assertThat(hits.getTotalHits(), equalTo(controlHits.getTotalHits())); assertThat(hits.getHits().length, equalTo(controlHits.getHits().length)); for (int i = 0; i < hits.getHits().length; i++) { - logger.info("{}: top_hits: [{}][{}] control: [{}][{}]", i, hits.getAt(i).getId(), hits.getAt(i).getSortValues()[0], controlHits.getAt(i).getId(), controlHits.getAt(i).getSortValues()[0]); + logger.info("{}: top_hits: [{}][{}] control: [{}][{}]", i, hits.getAt(i).getId(), hits.getAt(i).getSortValues()[0], + controlHits.getAt(i).getId(), controlHits.getAt(i).getSortValues()[0]); assertThat(hits.getAt(i).getId(), equalTo(controlHits.getAt(i).getId())); assertThat(hits.getAt(i).getSortValues()[0], equalTo(controlHits.getAt(i).getSortValues()[0])); } @@ -1000,51 +1002,55 @@ public void testNoStoredFields() throws Exception { * not using a script does get cached. */ public void testDontCacheScripts() throws Exception { - assertAcked(prepareCreate("cache_test_idx").addMapping("type", "d", "type=long") + try { + assertAcked(prepareCreate("cache_test_idx").addMapping("type", "d", "type=long") .setSettings(Settings.builder().put("requests.cache.enable", true).put("number_of_shards", 1).put("number_of_replicas", 1)) .get()); - indexRandom(true, client().prepareIndex("cache_test_idx", "type", "1").setSource("s", 1), + indexRandom(true, client().prepareIndex("cache_test_idx", "type", "1").setSource("s", 1), client().prepareIndex("cache_test_idx", "type", "2").setSource("s", 2)); - // Make sure we are starting with a clear cache - assertThat(client().admin().indices().prepareStats("cache_test_idx").setRequestCache(true).get().getTotal().getRequestCache() + // Make sure we are starting with a clear cache + assertThat(client().admin().indices().prepareStats("cache_test_idx").setRequestCache(true).get().getTotal().getRequestCache() .getHitCount(), equalTo(0L)); - assertThat(client().admin().indices().prepareStats("cache_test_idx").setRequestCache(true).get().getTotal().getRequestCache() + assertThat(client().admin().indices().prepareStats("cache_test_idx").setRequestCache(true).get().getTotal().getRequestCache() .getMissCount(), equalTo(0L)); - // Test that a request using a script field does not get cached - SearchResponse r = client().prepareSearch("cache_test_idx").setSize(0) + // Test that a request using a script field does not get cached + SearchResponse r = client().prepareSearch("cache_test_idx").setSize(0) .addAggregation(topHits("foo").scriptField("bar", new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "5", Collections.emptyMap()))).get(); - assertSearchResponse(r); + assertSearchResponse(r); - assertThat(client().admin().indices().prepareStats("cache_test_idx").setRequestCache(true).get().getTotal().getRequestCache() + assertThat(client().admin().indices().prepareStats("cache_test_idx").setRequestCache(true).get().getTotal().getRequestCache() .getHitCount(), equalTo(0L)); - assertThat(client().admin().indices().prepareStats("cache_test_idx").setRequestCache(true).get().getTotal().getRequestCache() + assertThat(client().admin().indices().prepareStats("cache_test_idx").setRequestCache(true).get().getTotal().getRequestCache() .getMissCount(), equalTo(0L)); - // Test that a request using a script sort does not get cached - r = client().prepareSearch("cache_test_idx").setSize(0) + // Test that a request using a script sort does not get cached + r = client().prepareSearch("cache_test_idx").setSize(0) .addAggregation(topHits("foo").sort( - SortBuilders.scriptSort( - new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "5", Collections.emptyMap()), ScriptSortType.STRING))) + SortBuilders.scriptSort( + new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "5", Collections.emptyMap()), ScriptSortType.STRING))) .get(); - assertSearchResponse(r); + assertSearchResponse(r); - assertThat(client().admin().indices().prepareStats("cache_test_idx").setRequestCache(true).get().getTotal().getRequestCache() + assertThat(client().admin().indices().prepareStats("cache_test_idx").setRequestCache(true).get().getTotal().getRequestCache() .getHitCount(), equalTo(0L)); - assertThat(client().admin().indices().prepareStats("cache_test_idx").setRequestCache(true).get().getTotal().getRequestCache() + assertThat(client().admin().indices().prepareStats("cache_test_idx").setRequestCache(true).get().getTotal().getRequestCache() .getMissCount(), equalTo(0L)); - // To make sure that the cache is working test that a request not using - // a script is cached - r = client().prepareSearch("cache_test_idx").setSize(0).addAggregation(topHits("foo")).get(); - assertSearchResponse(r); + // To make sure that the cache is working test that a request not using + // a script is cached + r = client().prepareSearch("cache_test_idx").setSize(0).addAggregation(topHits("foo")).get(); + assertSearchResponse(r); - assertThat(client().admin().indices().prepareStats("cache_test_idx").setRequestCache(true).get().getTotal().getRequestCache() + assertThat(client().admin().indices().prepareStats("cache_test_idx").setRequestCache(true).get().getTotal().getRequestCache() .getHitCount(), equalTo(0L)); - assertThat(client().admin().indices().prepareStats("cache_test_idx").setRequestCache(true).get().getTotal().getRequestCache() + assertThat(client().admin().indices().prepareStats("cache_test_idx").setRequestCache(true).get().getTotal().getRequestCache() .getMissCount(), equalTo(1L)); + } finally { + assertAcked(client().admin().indices().prepareDelete("cache_test_idx")); // delete this - if we use tests.iters it would fail + } } public void testWithRescore() { diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/metrics/avg/InternalAvgTests.java b/core/src/test/java/org/elasticsearch/search/aggregations/metrics/avg/InternalAvgTests.java index cdf13d04444c4..9dd5715ff93de 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/metrics/avg/InternalAvgTests.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/metrics/avg/InternalAvgTests.java @@ -50,8 +50,7 @@ protected void assertReduced(InternalAvg reduced, List inputs) { counts += in.getCount(); } assertEquals(counts, reduced.getCount()); - assertEquals(sum, reduced.getSum(), Double.MIN_VALUE); - assertEquals(sum / counts, reduced.value(), Double.MIN_VALUE); + assertEquals(sum, reduced.getSum(), 0.00000001); + assertEquals(sum / counts, reduced.value(), 0.00000001); } - } diff --git a/test/framework/src/main/java/org/elasticsearch/test/client/RandomizingClient.java b/test/framework/src/main/java/org/elasticsearch/test/client/RandomizingClient.java index 5814cac1316bd..77fb115c5f717 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/client/RandomizingClient.java +++ b/test/framework/src/main/java/org/elasticsearch/test/client/RandomizingClient.java @@ -36,6 +36,7 @@ public class RandomizingClient extends FilterClient { private final SearchType defaultSearchType; private final String defaultPreference; + private final int reduceUpTo; public RandomizingClient(Client client, Random random) { @@ -53,12 +54,13 @@ public RandomizingClient(Client client, Random random) { } else { defaultPreference = null; } + this.reduceUpTo = 2 + random.nextInt(10); } - + @Override public SearchRequestBuilder prepareSearch(String... indices) { - return in.prepareSearch(indices).setSearchType(defaultSearchType).setPreference(defaultPreference); + return in.prepareSearch(indices).setSearchType(defaultSearchType).setPreference(defaultPreference).setReduceUpTo(reduceUpTo); } @Override