From aaa6e78085c9edffe3d735d8f311b6bec0d4bd3a Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 15 Nov 2023 00:47:01 +0100 Subject: [PATCH] Move LeakTracker to production code and use it in assertions around QuerySearchContext (#102179) Part of the ref counted search hits effort requires us to correctly ref count `FetchSearchPhase`. This doesn't commit moves us one step in the direction of doing so by adding testing that ensure that `QuerySearchContext` is ref counted correctly and fixes one production code spot where it wasn't (albeit that spot worked out for other reasons anyways). This is done by moving the leak tracker to production code and making use of it selectively in case assertions are enabled. --- .../search/fetch/QueryFetchSearchResult.java | 13 +- .../search/query/QueryPhase.java | 15 +- .../search/query/QuerySearchResult.java | 7 +- .../search/rank/RankSearchContext.java | 1 + .../elasticsearch/transport/LeakTracker.java | 37 + .../action/search/DfsQueryPhaseTests.java | 88 +- .../action/search/FetchSearchPhaseTests.java | 239 +++--- .../search/SearchPhaseControllerTests.java | 768 ++++++++++-------- .../SearchQueryThenFetchAsyncActionTests.java | 75 +- .../search/SearchServiceTests.java | 250 +++--- .../search/query/QueryPhaseTests.java | 63 +- .../search/query/QuerySearchResultTests.java | 50 +- .../search/MockSearchService.java | 7 +- 13 files changed, 927 insertions(+), 686 deletions(-) rename {test/framework => server}/src/main/java/org/elasticsearch/transport/LeakTracker.java (89%) diff --git a/server/src/main/java/org/elasticsearch/search/fetch/QueryFetchSearchResult.java b/server/src/main/java/org/elasticsearch/search/fetch/QueryFetchSearchResult.java index 78d6882472ebd..193f8c04664bf 100644 --- a/server/src/main/java/org/elasticsearch/search/fetch/QueryFetchSearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/fetch/QueryFetchSearchResult.java @@ -16,6 +16,7 @@ import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.ShardSearchContextId; import org.elasticsearch.search.query.QuerySearchResult; +import org.elasticsearch.transport.LeakTracker; import java.io.IOException; @@ -26,14 +27,8 @@ public final class QueryFetchSearchResult extends SearchPhaseResult { private final RefCounted refCounted; public QueryFetchSearchResult(StreamInput in) throws IOException { - super(in); // These get a ref count of 1 when we create them, so we don't need to incRef here - queryResult = new QuerySearchResult(in); - fetchResult = new FetchSearchResult(in); - refCounted = AbstractRefCounted.of(() -> { - queryResult.decRef(); - fetchResult.decRef(); - }); + this(new QuerySearchResult(in), new FetchSearchResult(in)); } public QueryFetchSearchResult(QuerySearchResult queryResult, FetchSearchResult fetchResult) { @@ -42,10 +37,10 @@ public QueryFetchSearchResult(QuerySearchResult queryResult, FetchSearchResult f // We're acquiring a copy, we should incRef it this.queryResult.incRef(); this.fetchResult.incRef(); - refCounted = AbstractRefCounted.of(() -> { + refCounted = LeakTracker.wrap(AbstractRefCounted.of(() -> { queryResult.decRef(); fetchResult.decRef(); - }); + })); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java index 3044d15ab8552..01015ec8cc78e 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java +++ b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java @@ -94,13 +94,14 @@ static void executeRank(SearchContext searchContext) throws QueryPhaseExecutionE if (searchTimedOut) { break; } - RankSearchContext rankSearchContext = new RankSearchContext(searchContext, rankQuery, rankShardContext.windowSize()); - QueryPhase.addCollectorsAndSearch(rankSearchContext); - QuerySearchResult rrfQuerySearchResult = rankSearchContext.queryResult(); - rrfRankResults.add(rrfQuerySearchResult.topDocs().topDocs); - serviceTimeEWMA += rrfQuerySearchResult.serviceTimeEWMA(); - nodeQueueSize = Math.max(nodeQueueSize, rrfQuerySearchResult.nodeQueueSize()); - searchTimedOut = rrfQuerySearchResult.searchTimedOut(); + try (RankSearchContext rankSearchContext = new RankSearchContext(searchContext, rankQuery, rankShardContext.windowSize())) { + QueryPhase.addCollectorsAndSearch(rankSearchContext); + QuerySearchResult rrfQuerySearchResult = rankSearchContext.queryResult(); + rrfRankResults.add(rrfQuerySearchResult.topDocs().topDocs); + serviceTimeEWMA += rrfQuerySearchResult.serviceTimeEWMA(); + nodeQueueSize = Math.max(nodeQueueSize, rrfQuerySearchResult.nodeQueueSize()); + searchTimedOut = rrfQuerySearchResult.searchTimedOut(); + } } querySearchResult.setRankShardResult(rankShardContext.combine(rrfRankResults)); diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index edebf602af188..301d7fb219ca7 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -31,6 +31,7 @@ import org.elasticsearch.search.profile.SearchProfileQueryPhaseResult; import org.elasticsearch.search.rank.RankShardResult; import org.elasticsearch.search.suggest.Suggest; +import org.elasticsearch.transport.LeakTracker; import java.io.IOException; import java.util.ArrayList; @@ -104,8 +105,8 @@ public QuerySearchResult(ShardSearchContextId contextId, SearchShardTarget shard setSearchShardTarget(shardTarget); isNull = false; setShardSearchRequest(shardSearchRequest); - this.refCounted = AbstractRefCounted.of(this::close); this.toRelease = new ArrayList<>(); + this.refCounted = LeakTracker.wrap(AbstractRefCounted.of(() -> Releasables.close(toRelease))); } private QuerySearchResult(boolean isNull) { @@ -245,10 +246,6 @@ public void releaseAggs() { } } - private void close() { - Releasables.close(toRelease); - } - public void addReleasable(Releasable releasable) { toRelease.add(releasable); } diff --git a/server/src/main/java/org/elasticsearch/search/rank/RankSearchContext.java b/server/src/main/java/org/elasticsearch/search/rank/RankSearchContext.java index 84f04283d64e8..ed6fcd16fb5e2 100644 --- a/server/src/main/java/org/elasticsearch/search/rank/RankSearchContext.java +++ b/server/src/main/java/org/elasticsearch/search/rank/RankSearchContext.java @@ -64,6 +64,7 @@ public RankSearchContext(SearchContext parent, Query rankQuery, int windowSize) this.rankQuery = parent.buildFilteredQuery(rankQuery); this.windowSize = windowSize; this.querySearchResult = new QuerySearchResult(parent.readerContext().id(), parent.shardTarget(), parent.request()); + this.addReleasable(querySearchResult::decRef); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/transport/LeakTracker.java b/server/src/main/java/org/elasticsearch/transport/LeakTracker.java similarity index 89% rename from test/framework/src/main/java/org/elasticsearch/transport/LeakTracker.java rename to server/src/main/java/org/elasticsearch/transport/LeakTracker.java index ce82e62df698a..4eefd4cd2080a 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/LeakTracker.java +++ b/server/src/main/java/org/elasticsearch/transport/LeakTracker.java @@ -13,6 +13,8 @@ import org.elasticsearch.common.Randomness; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.core.Assertions; +import org.elasticsearch.core.RefCounted; import java.lang.ref.ReferenceQueue; import java.lang.ref.WeakReference; @@ -69,6 +71,41 @@ public void reportLeak() { } } + public static RefCounted wrap(RefCounted refCounted) { + if (Assertions.ENABLED == false) { + return refCounted; + } + var leak = INSTANCE.track(refCounted); + return new RefCounted() { + @Override + public void incRef() { + leak.record(); + refCounted.incRef(); + } + + @Override + public boolean tryIncRef() { + leak.record(); + return refCounted.tryIncRef(); + } + + @Override + public boolean decRef() { + if (refCounted.decRef()) { + leak.close(refCounted); + return true; + } + leak.record(); + return false; + } + + @Override + public boolean hasReferences() { + return refCounted.hasReferences(); + } + }; + } + public static final class Leak extends WeakReference { @SuppressWarnings({ "unchecked", "rawtypes" }) diff --git a/server/src/test/java/org/elasticsearch/action/search/DfsQueryPhaseTests.java b/server/src/test/java/org/elasticsearch/action/search/DfsQueryPhaseTests.java index 65a0950d05b4d..b896ae3d3f025 100644 --- a/server/src/test/java/org/elasticsearch/action/search/DfsQueryPhaseTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/DfsQueryPhaseTests.java @@ -81,30 +81,38 @@ public void sendExecuteQuery( new SearchShardTarget("node1", new ShardId("test", "na", 0), null), null ); - queryResult.topDocs( - new TopDocsAndMaxScore( - new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(42, 1.0F) }), - 2.0F - ), - new DocValueFormat[0] - ); - queryResult.size(2); // the size of the result set - listener.onResponse(queryResult); + try { + queryResult.topDocs( + new TopDocsAndMaxScore( + new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(42, 1.0F) }), + 2.0F + ), + new DocValueFormat[0] + ); + queryResult.size(2); // the size of the result set + listener.onResponse(queryResult); + } finally { + queryResult.decRef(); + } } else if (request.contextId().getId() == 2) { QuerySearchResult queryResult = new QuerySearchResult( new ShardSearchContextId("", 123), new SearchShardTarget("node2", new ShardId("test", "na", 0), null), null ); - queryResult.topDocs( - new TopDocsAndMaxScore( - new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(84, 2.0F) }), - 2.0F - ), - new DocValueFormat[0] - ); - queryResult.size(2); // the size of the result set - listener.onResponse(queryResult); + try { + queryResult.topDocs( + new TopDocsAndMaxScore( + new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(84, 2.0F) }), + 2.0F + ), + new DocValueFormat[0] + ); + queryResult.size(2); // the size of the result set + listener.onResponse(queryResult); + } finally { + queryResult.decRef(); + } } else { fail("no such request ID: " + request.contextId()); } @@ -172,15 +180,19 @@ public void sendExecuteQuery( new SearchShardTarget("node1", new ShardId("test", "na", 0), null), null ); - queryResult.topDocs( - new TopDocsAndMaxScore( - new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(42, 1.0F) }), - 2.0F - ), - new DocValueFormat[0] - ); - queryResult.size(2); // the size of the result set - listener.onResponse(queryResult); + try { + queryResult.topDocs( + new TopDocsAndMaxScore( + new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(42, 1.0F) }), + 2.0F + ), + new DocValueFormat[0] + ); + queryResult.size(2); // the size of the result set + listener.onResponse(queryResult); + } finally { + queryResult.decRef(); + } } else if (request.contextId().getId() == 2) { listener.onFailure(new MockDirectoryWrapper.FakeIOException()); } else { @@ -252,15 +264,19 @@ public void sendExecuteQuery( new SearchShardTarget("node1", new ShardId("test", "na", 0), null), null ); - queryResult.topDocs( - new TopDocsAndMaxScore( - new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(42, 1.0F) }), - 2.0F - ), - new DocValueFormat[0] - ); - queryResult.size(2); // the size of the result set - listener.onResponse(queryResult); + try { + queryResult.topDocs( + new TopDocsAndMaxScore( + new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(42, 1.0F) }), + 2.0F + ), + new DocValueFormat[0] + ); + queryResult.size(2); // the size of the result set + listener.onResponse(queryResult); + } finally { + queryResult.decRef(); + } } else if (request.contextId().getId() == 2) { throw new UncheckedIOException(new MockDirectoryWrapper.FakeIOException()); } else { diff --git a/server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java b/server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java index 82e579ce7eb36..3fa5c6fc4283a 100644 --- a/server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java @@ -76,8 +76,12 @@ public void testShortcutQueryAndFetchOptimization() { SearchHits hits = new SearchHits(new SearchHit[] { new SearchHit(42) }, new TotalHits(1, TotalHits.Relation.EQUAL_TO), 1.0F); fetchResult.shardResult(hits, fetchProfile(profiled)); QueryFetchSearchResult fetchSearchResult = new QueryFetchSearchResult(queryResult, fetchResult); - fetchSearchResult.setShardIndex(0); - results.consumeResult(fetchSearchResult, () -> {}); + try { + fetchSearchResult.setShardIndex(0); + results.consumeResult(fetchSearchResult, () -> {}); + } finally { + fetchSearchResult.decRef(); + } numHits = 1; } else { numHits = 0; @@ -135,33 +139,42 @@ public void testFetchTwoDocument() { ShardSearchContextId ctx1 = new ShardSearchContextId(UUIDs.base64UUID(), 123); SearchShardTarget shard1Target = new SearchShardTarget("node1", new ShardId("test", "na", 0), null); + SearchShardTarget shard2Target = new SearchShardTarget("node2", new ShardId("test", "na", 1), null); QuerySearchResult queryResult = new QuerySearchResult(ctx1, shard1Target, null); - queryResult.topDocs( - new TopDocsAndMaxScore( - new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(42, 1.0F) }), - 2.0F - ), - new DocValueFormat[0] - ); - queryResult.size(resultSetSize); // the size of the result set - queryResult.setShardIndex(0); - addProfiling(profiled, queryResult); - results.consumeResult(queryResult, () -> {}); + try { + queryResult.topDocs( + new TopDocsAndMaxScore( + new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(42, 1.0F) }), + 2.0F + ), + new DocValueFormat[0] + ); + queryResult.size(resultSetSize); // the size of the result set + queryResult.setShardIndex(0); + addProfiling(profiled, queryResult); + results.consumeResult(queryResult, () -> {}); + + } finally { + queryResult.decRef(); + } final ShardSearchContextId ctx2 = new ShardSearchContextId(UUIDs.base64UUID(), 321); - SearchShardTarget shard2Target = new SearchShardTarget("node2", new ShardId("test", "na", 1), null); - queryResult = new QuerySearchResult(ctx2, shard2Target, null); - queryResult.topDocs( - new TopDocsAndMaxScore( - new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(84, 2.0F) }), - 2.0F - ), - new DocValueFormat[0] - ); - queryResult.size(resultSetSize); - queryResult.setShardIndex(1); - addProfiling(profiled, queryResult); - results.consumeResult(queryResult, () -> {}); + try { + queryResult = new QuerySearchResult(ctx2, shard2Target, null); + queryResult.topDocs( + new TopDocsAndMaxScore( + new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(84, 2.0F) }), + 2.0F + ), + new DocValueFormat[0] + ); + queryResult.size(resultSetSize); + queryResult.setShardIndex(1); + addProfiling(profiled, queryResult); + results.consumeResult(queryResult, () -> {}); + } finally { + queryResult.decRef(); + } mockSearchPhaseContext.searchTransport = new SearchTransportService(null, null, null) { @Override @@ -228,31 +241,39 @@ public void testFailFetchOneDoc() { final ShardSearchContextId ctx = new ShardSearchContextId(UUIDs.base64UUID(), 123); SearchShardTarget shard1Target = new SearchShardTarget("node1", new ShardId("test", "na", 0), null); QuerySearchResult queryResult = new QuerySearchResult(ctx, shard1Target, null); - queryResult.topDocs( - new TopDocsAndMaxScore( - new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(42, 1.0F) }), - 2.0F - ), - new DocValueFormat[0] - ); - queryResult.size(resultSetSize); // the size of the result set - queryResult.setShardIndex(0); - addProfiling(profiled, queryResult); - results.consumeResult(queryResult, () -> {}); + try { + queryResult.topDocs( + new TopDocsAndMaxScore( + new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(42, 1.0F) }), + 2.0F + ), + new DocValueFormat[0] + ); + queryResult.size(resultSetSize); // the size of the result set + queryResult.setShardIndex(0); + addProfiling(profiled, queryResult); + results.consumeResult(queryResult, () -> {}); + } finally { + queryResult.decRef(); + } SearchShardTarget shard2Target = new SearchShardTarget("node2", new ShardId("test", "na", 1), null); queryResult = new QuerySearchResult(new ShardSearchContextId("", 321), shard2Target, null); - queryResult.topDocs( - new TopDocsAndMaxScore( - new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(84, 2.0F) }), - 2.0F - ), - new DocValueFormat[0] - ); - queryResult.size(resultSetSize); - queryResult.setShardIndex(1); - addProfiling(profiled, queryResult); - results.consumeResult(queryResult, () -> {}); + try { + queryResult.topDocs( + new TopDocsAndMaxScore( + new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(84, 2.0F) }), + 2.0F + ), + new DocValueFormat[0] + ); + queryResult.size(resultSetSize); + queryResult.setShardIndex(1); + addProfiling(profiled, queryResult); + results.consumeResult(queryResult, () -> {}); + } finally { + queryResult.decRef(); + } mockSearchPhaseContext.searchTransport = new SearchTransportService(null, null, null) { @Override @@ -345,10 +366,14 @@ public void testFetchDocsConcurrently() throws InterruptedException { ), new DocValueFormat[0] ); - queryResult.size(resultSetSize); // the size of the result set - queryResult.setShardIndex(i); - addProfiling(profiled, queryResult); - results.consumeResult(queryResult, () -> {}); + try { + queryResult.size(resultSetSize); // the size of the result set + queryResult.setShardIndex(i); + addProfiling(profiled, queryResult); + results.consumeResult(queryResult, () -> {}); + } finally { + queryResult.decRef(); + } } mockSearchPhaseContext.searchTransport = new SearchTransportService(null, null, null) { @Override @@ -437,32 +462,39 @@ public void testExceptionFailsPhase() { boolean profiled = randomBoolean(); SearchShardTarget shard1Target = new SearchShardTarget("node1", new ShardId("test", "na", 0), null); - QuerySearchResult queryResult = new QuerySearchResult(new ShardSearchContextId("", 123), shard1Target, null); - queryResult.topDocs( - new TopDocsAndMaxScore( - new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(42, 1.0F) }), - 2.0F - ), - new DocValueFormat[0] - ); - queryResult.size(resultSetSize); // the size of the result set - queryResult.setShardIndex(0); - addProfiling(profiled, queryResult); - results.consumeResult(queryResult, () -> {}); - SearchShardTarget shard2Target = new SearchShardTarget("node1", new ShardId("test", "na", 0), null); - queryResult = new QuerySearchResult(new ShardSearchContextId("", 321), shard2Target, null); - queryResult.topDocs( - new TopDocsAndMaxScore( - new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(84, 2.0F) }), - 2.0F - ), - new DocValueFormat[0] - ); - queryResult.size(resultSetSize); - queryResult.setShardIndex(1); - addProfiling(profiled, queryResult); - results.consumeResult(queryResult, () -> {}); + QuerySearchResult queryResult = new QuerySearchResult(new ShardSearchContextId("", 123), shard1Target, null); + try { + queryResult.topDocs( + new TopDocsAndMaxScore( + new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(42, 1.0F) }), + 2.0F + ), + new DocValueFormat[0] + ); + queryResult.size(resultSetSize); // the size of the result set + queryResult.setShardIndex(0); + addProfiling(profiled, queryResult); + results.consumeResult(queryResult, () -> {}); + } finally { + queryResult.decRef(); + } + try { + queryResult = new QuerySearchResult(new ShardSearchContextId("", 321), shard2Target, null); + queryResult.topDocs( + new TopDocsAndMaxScore( + new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(84, 2.0F) }), + 2.0F + ), + new DocValueFormat[0] + ); + queryResult.size(resultSetSize); + queryResult.setShardIndex(1); + addProfiling(profiled, queryResult); + results.consumeResult(queryResult, () -> {}); + } finally { + queryResult.decRef(); + } AtomicInteger numFetches = new AtomicInteger(0); mockSearchPhaseContext.searchTransport = new SearchTransportService(null, null, null) { @@ -527,32 +559,39 @@ public void testCleanupIrrelevantContexts() { // contexts that are not fetched s final ShardSearchContextId ctx1 = new ShardSearchContextId(UUIDs.base64UUID(), 123); SearchShardTarget shard1Target = new SearchShardTarget("node1", new ShardId("test", "na", 0), null); QuerySearchResult queryResult = new QuerySearchResult(ctx1, shard1Target, null); - queryResult.topDocs( - new TopDocsAndMaxScore( - new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(42, 1.0F) }), - 2.0F - ), - new DocValueFormat[0] - ); - queryResult.size(resultSetSize); // the size of the result set - queryResult.setShardIndex(0); - addProfiling(profiled, queryResult); - results.consumeResult(queryResult, () -> {}); - + try { + queryResult.topDocs( + new TopDocsAndMaxScore( + new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(42, 1.0F) }), + 2.0F + ), + new DocValueFormat[0] + ); + queryResult.size(resultSetSize); // the size of the result set + queryResult.setShardIndex(0); + addProfiling(profiled, queryResult); + results.consumeResult(queryResult, () -> {}); + } finally { + queryResult.decRef(); + } final ShardSearchContextId ctx2 = new ShardSearchContextId(UUIDs.base64UUID(), 321); SearchShardTarget shard2Target = new SearchShardTarget("node2", new ShardId("test", "na", 1), null); queryResult = new QuerySearchResult(ctx2, shard2Target, null); - queryResult.topDocs( - new TopDocsAndMaxScore( - new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(84, 2.0F) }), - 2.0F - ), - new DocValueFormat[0] - ); - queryResult.size(resultSetSize); - queryResult.setShardIndex(1); - addProfiling(profiled, queryResult); - results.consumeResult(queryResult, () -> {}); + try { + queryResult.topDocs( + new TopDocsAndMaxScore( + new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(84, 2.0F) }), + 2.0F + ), + new DocValueFormat[0] + ); + queryResult.size(resultSetSize); + queryResult.setShardIndex(1); + addProfiling(profiled, queryResult); + results.consumeResult(queryResult, () -> {}); + } finally { + queryResult.decRef(); + } mockSearchPhaseContext.searchTransport = new SearchTransportService(null, null, null) { @Override diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java index 93436ed9b0768..424ccdafb87e4 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java @@ -72,6 +72,7 @@ import org.elasticsearch.test.InternalAggregationTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportMessage; import org.junit.After; import org.junit.Before; @@ -155,27 +156,31 @@ public void testSortDocs() { int nShards = randomIntBetween(1, 20); int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2); AtomicArray results = generateQueryResults(nShards, suggestions, queryResultSize, false, false, false); - Optional first = results.asList().stream().findFirst(); - int from = 0, size = 0; - if (first.isPresent()) { - from = first.get().queryResult().from(); - size = first.get().queryResult().size(); - } - int accumulatedLength = Math.min(queryResultSize, getTotalQueryHits(results)); - List reducedCompletionSuggestions = reducedSuggest(results); - for (Suggest.Suggestion suggestion : reducedCompletionSuggestions) { - int suggestionSize = suggestion.getEntries().get(0).getOptions().size(); - accumulatedLength += suggestionSize; - } - List topDocsList = new ArrayList<>(); - for (SearchPhaseResult result : results.asList()) { - QuerySearchResult queryResult = result.queryResult(); - TopDocs topDocs = queryResult.consumeTopDocs().topDocs; - SearchPhaseController.setShardIndex(topDocs, result.getShardIndex()); - topDocsList.add(topDocs); + try { + Optional first = results.asList().stream().findFirst(); + int from = 0, size = 0; + if (first.isPresent()) { + from = first.get().queryResult().from(); + size = first.get().queryResult().size(); + } + int accumulatedLength = Math.min(queryResultSize, getTotalQueryHits(results)); + List reducedCompletionSuggestions = reducedSuggest(results); + for (Suggest.Suggestion suggestion : reducedCompletionSuggestions) { + int suggestionSize = suggestion.getEntries().get(0).getOptions().size(); + accumulatedLength += suggestionSize; + } + List topDocsList = new ArrayList<>(); + for (SearchPhaseResult result : results.asList()) { + QuerySearchResult queryResult = result.queryResult(); + TopDocs topDocs = queryResult.consumeTopDocs().topDocs; + SearchPhaseController.setShardIndex(topDocs, result.getShardIndex()); + topDocsList.add(topDocs); + } + ScoreDoc[] sortedDocs = SearchPhaseController.sortDocs(true, topDocsList, from, size, reducedCompletionSuggestions).scoreDocs(); + assertThat(sortedDocs.length, equalTo(accumulatedLength)); + } finally { + results.asList().forEach(TransportMessage::decRef); } - ScoreDoc[] sortedDocs = SearchPhaseController.sortDocs(true, topDocsList, from, size, reducedCompletionSuggestions).scoreDocs(); - assertThat(sortedDocs.length, equalTo(accumulatedLength)); } public void testSortDocsIsIdempotent() throws Exception { @@ -190,36 +195,45 @@ public void testSortDocsIsIdempotent() throws Exception { queryResultSize, useConstantScore ); + List topDocsList = new ArrayList<>(); boolean ignoreFrom = randomBoolean(); - Optional first = results.asList().stream().findFirst(); int from = 0, size = 0; - if (first.isPresent()) { - from = first.get().queryResult().from(); - size = first.get().queryResult().size(); - } - List topDocsList = new ArrayList<>(); - for (SearchPhaseResult result : results.asList()) { - QuerySearchResult queryResult = result.queryResult(); - TopDocs topDocs = queryResult.consumeTopDocs().topDocs; - topDocsList.add(topDocs); - SearchPhaseController.setShardIndex(topDocs, result.getShardIndex()); + ScoreDoc[] sortedDocs; + try { + Optional first = results.asList().stream().findFirst(); + if (first.isPresent()) { + from = first.get().queryResult().from(); + size = first.get().queryResult().size(); + } + for (SearchPhaseResult result : results.asList()) { + QuerySearchResult queryResult = result.queryResult(); + TopDocs topDocs = queryResult.consumeTopDocs().topDocs; + topDocsList.add(topDocs); + SearchPhaseController.setShardIndex(topDocs, result.getShardIndex()); + } + sortedDocs = SearchPhaseController.sortDocs(ignoreFrom, topDocsList, from, size, Collections.emptyList()).scoreDocs(); + } finally { + results.asList().forEach(TransportMessage::decRef); } - ScoreDoc[] sortedDocs = SearchPhaseController.sortDocs(ignoreFrom, topDocsList, from, size, Collections.emptyList()).scoreDocs(); - results = generateSeededQueryResults(randomSeed, nShards, Collections.emptyList(), queryResultSize, useConstantScore); - topDocsList = new ArrayList<>(); - for (SearchPhaseResult result : results.asList()) { - QuerySearchResult queryResult = result.queryResult(); - TopDocs topDocs = queryResult.consumeTopDocs().topDocs; - topDocsList.add(topDocs); - SearchPhaseController.setShardIndex(topDocs, result.getShardIndex()); - } - ScoreDoc[] sortedDocs2 = SearchPhaseController.sortDocs(ignoreFrom, topDocsList, from, size, Collections.emptyList()).scoreDocs(); - assertEquals(sortedDocs.length, sortedDocs2.length); - for (int i = 0; i < sortedDocs.length; i++) { - assertEquals(sortedDocs[i].doc, sortedDocs2[i].doc); - assertEquals(sortedDocs[i].shardIndex, sortedDocs2[i].shardIndex); - assertEquals(sortedDocs[i].score, sortedDocs2[i].score, 0.0f); + try { + topDocsList = new ArrayList<>(); + for (SearchPhaseResult result : results.asList()) { + QuerySearchResult queryResult = result.queryResult(); + TopDocs topDocs = queryResult.consumeTopDocs().topDocs; + topDocsList.add(topDocs); + SearchPhaseController.setShardIndex(topDocs, result.getShardIndex()); + } + ScoreDoc[] sortedDocs2 = SearchPhaseController.sortDocs(ignoreFrom, topDocsList, from, size, Collections.emptyList()) + .scoreDocs(); + assertEquals(sortedDocs.length, sortedDocs2.length); + for (int i = 0; i < sortedDocs.length; i++) { + assertEquals(sortedDocs[i].doc, sortedDocs2[i].doc); + assertEquals(sortedDocs[i].shardIndex, sortedDocs2[i].shardIndex); + assertEquals(sortedDocs[i].score, sortedDocs2[i].score, 0.0f); + } + } finally { + results.asList().forEach(TransportMessage::decRef); } } @@ -257,77 +271,87 @@ public void testMerge() { profile, false ); - SearchPhaseController.ReducedQueryPhase reducedQueryPhase = SearchPhaseController.reducedQueryPhase( - queryResults.asList(), - new ArrayList<>(), - new ArrayList<>(), - new TopDocsStats(trackTotalHits), - 0, - true, - InternalAggregationTestCase.emptyReduceContextBuilder(), - null, - true - ); - List shards = queryResults.asList().stream().map(SearchPhaseResult::getSearchShardTarget).collect(toList()); - AtomicArray fetchResults = generateFetchResults( - shards, - reducedQueryPhase.sortedTopDocs().scoreDocs(), - reducedQueryPhase.suggest(), - profile - ); - InternalSearchResponse mergedResponse = SearchPhaseController.merge( - false, - reducedQueryPhase, - fetchResults.asList(), - fetchResults::get - ); - if (trackTotalHits == SearchContext.TRACK_TOTAL_HITS_DISABLED) { - assertNull(mergedResponse.hits.getTotalHits()); - } else { - assertThat(mergedResponse.hits.getTotalHits().value, equalTo(0L)); - assertEquals(mergedResponse.hits.getTotalHits().relation, Relation.EQUAL_TO); - } - for (SearchHit hit : mergedResponse.hits().getHits()) { - SearchPhaseResult searchPhaseResult = fetchResults.get(hit.getShard().getShardId().id()); - assertSame(searchPhaseResult.getSearchShardTarget(), hit.getShard()); - } - int suggestSize = 0; - for (Suggest.Suggestion s : reducedQueryPhase.suggest()) { - suggestSize += s.getEntries().stream().mapToInt(e -> e.getOptions().size()).sum(); - } - assertThat(suggestSize, lessThanOrEqualTo(maxSuggestSize)); - assertThat(mergedResponse.hits().getHits().length, equalTo(reducedQueryPhase.sortedTopDocs().scoreDocs().length - suggestSize)); - Suggest suggestResult = mergedResponse.suggest(); - for (Suggest.Suggestion suggestion : reducedQueryPhase.suggest()) { - assertThat(suggestion, instanceOf(CompletionSuggestion.class)); - if (suggestion.getEntries().get(0).getOptions().size() > 0) { - CompletionSuggestion suggestionResult = suggestResult.getSuggestion(suggestion.getName()); - assertNotNull(suggestionResult); - List options = suggestionResult.getEntries().get(0).getOptions(); - assertThat(options.size(), equalTo(suggestion.getEntries().get(0).getOptions().size())); - for (CompletionSuggestion.Entry.Option option : options) { - assertNotNull(option.getHit()); - SearchPhaseResult searchPhaseResult = fetchResults.get(option.getHit().getShard().getShardId().id()); - assertSame(searchPhaseResult.getSearchShardTarget(), option.getHit().getShard()); - } - } - } - if (profile) { - assertThat(mergedResponse.profile().entrySet(), hasSize(nShards)); - assertThat( - // All shards should have a query profile - mergedResponse.profile().toString(), - mergedResponse.profile().values().stream().filter(r -> r.getQueryProfileResults() != null).count(), - equalTo((long) nShards) + try { + SearchPhaseController.ReducedQueryPhase reducedQueryPhase = SearchPhaseController.reducedQueryPhase( + queryResults.asList(), + new ArrayList<>(), + new ArrayList<>(), + new TopDocsStats(trackTotalHits), + 0, + true, + InternalAggregationTestCase.emptyReduceContextBuilder(), + null, + true + ); + List shards = queryResults.asList() + .stream() + .map(SearchPhaseResult::getSearchShardTarget) + .collect(toList()); + AtomicArray fetchResults = generateFetchResults( + shards, + reducedQueryPhase.sortedTopDocs().scoreDocs(), + reducedQueryPhase.suggest(), + profile + ); + InternalSearchResponse mergedResponse = SearchPhaseController.merge( + false, + reducedQueryPhase, + fetchResults.asList(), + fetchResults::get ); + if (trackTotalHits == SearchContext.TRACK_TOTAL_HITS_DISABLED) { + assertNull(mergedResponse.hits.getTotalHits()); + } else { + assertThat(mergedResponse.hits.getTotalHits().value, equalTo(0L)); + assertEquals(mergedResponse.hits.getTotalHits().relation, Relation.EQUAL_TO); + } + for (SearchHit hit : mergedResponse.hits().getHits()) { + SearchPhaseResult searchPhaseResult = fetchResults.get(hit.getShard().getShardId().id()); + assertSame(searchPhaseResult.getSearchShardTarget(), hit.getShard()); + } + int suggestSize = 0; + for (Suggest.Suggestion s : reducedQueryPhase.suggest()) { + suggestSize += s.getEntries().stream().mapToInt(e -> e.getOptions().size()).sum(); + } + assertThat(suggestSize, lessThanOrEqualTo(maxSuggestSize)); assertThat( - // Some or all shards should have a fetch profile - mergedResponse.profile().toString(), - mergedResponse.profile().values().stream().filter(r -> r.getFetchPhase() != null).count(), - both(greaterThan(0L)).and(lessThanOrEqualTo((long) nShards)) + mergedResponse.hits().getHits().length, + equalTo(reducedQueryPhase.sortedTopDocs().scoreDocs().length - suggestSize) ); - } else { - assertThat(mergedResponse.profile(), is(anEmptyMap())); + Suggest suggestResult = mergedResponse.suggest(); + for (Suggest.Suggestion suggestion : reducedQueryPhase.suggest()) { + assertThat(suggestion, instanceOf(CompletionSuggestion.class)); + if (suggestion.getEntries().get(0).getOptions().size() > 0) { + CompletionSuggestion suggestionResult = suggestResult.getSuggestion(suggestion.getName()); + assertNotNull(suggestionResult); + List options = suggestionResult.getEntries().get(0).getOptions(); + assertThat(options.size(), equalTo(suggestion.getEntries().get(0).getOptions().size())); + for (CompletionSuggestion.Entry.Option option : options) { + assertNotNull(option.getHit()); + SearchPhaseResult searchPhaseResult = fetchResults.get(option.getHit().getShard().getShardId().id()); + assertSame(searchPhaseResult.getSearchShardTarget(), option.getHit().getShard()); + } + } + } + if (profile) { + assertThat(mergedResponse.profile().entrySet(), hasSize(nShards)); + assertThat( + // All shards should have a query profile + mergedResponse.profile().toString(), + mergedResponse.profile().values().stream().filter(r -> r.getQueryProfileResults() != null).count(), + equalTo((long) nShards) + ); + assertThat( + // Some or all shards should have a fetch profile + mergedResponse.profile().toString(), + mergedResponse.profile().values().stream().filter(r -> r.getFetchPhase() != null).count(), + both(greaterThan(0L)).and(lessThanOrEqualTo((long) nShards)) + ); + } else { + assertThat(mergedResponse.profile(), is(anEmptyMap())); + } + } finally { + queryResults.asList().forEach(TransportMessage::decRef); } } } @@ -337,70 +361,80 @@ public void testMergeWithRank() { int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2); for (int trackTotalHits : new int[] { SearchContext.TRACK_TOTAL_HITS_DISABLED, SearchContext.TRACK_TOTAL_HITS_ACCURATE }) { AtomicArray queryResults = generateQueryResults(nShards, List.of(), queryResultSize, false, false, true); - SearchPhaseController.ReducedQueryPhase reducedQueryPhase = SearchPhaseController.reducedQueryPhase( - queryResults.asList(), - new ArrayList<>(), - new ArrayList<>(), - new TopDocsStats(trackTotalHits), - 0, - true, - InternalAggregationTestCase.emptyReduceContextBuilder(), - new RankCoordinatorContext(randomIntBetween(1, 10), 0, randomIntBetween(11, 100)) { - @Override - public SearchPhaseController.SortedTopDocs rank(List querySearchResults, TopDocsStats topDocStats) { - PriorityQueue queue = new PriorityQueue(windowSize) { - @Override - protected boolean lessThan(RankDoc a, RankDoc b) { - return a.score < b.score; - } - }; - for (QuerySearchResult qsr : querySearchResults) { - RankShardResult rsr = qsr.getRankShardResult(); - if (rsr != null) { - for (RankDoc rd : ((TestRankShardResult) rsr).testRankDocs) { - queue.insertWithOverflow(rd); + try { + SearchPhaseController.ReducedQueryPhase reducedQueryPhase = SearchPhaseController.reducedQueryPhase( + queryResults.asList(), + new ArrayList<>(), + new ArrayList<>(), + new TopDocsStats(trackTotalHits), + 0, + true, + InternalAggregationTestCase.emptyReduceContextBuilder(), + new RankCoordinatorContext(randomIntBetween(1, 10), 0, randomIntBetween(11, 100)) { + @Override + public SearchPhaseController.SortedTopDocs rank( + List querySearchResults, + TopDocsStats topDocStats + ) { + PriorityQueue queue = new PriorityQueue(windowSize) { + @Override + protected boolean lessThan(RankDoc a, RankDoc b) { + return a.score < b.score; + } + }; + for (QuerySearchResult qsr : querySearchResults) { + RankShardResult rsr = qsr.getRankShardResult(); + if (rsr != null) { + for (RankDoc rd : ((TestRankShardResult) rsr).testRankDocs) { + queue.insertWithOverflow(rd); + } } } + int size = Math.min(this.size, queue.size()); + RankDoc[] topResults = new RankDoc[size]; + for (int rdi = 0; rdi < size; ++rdi) { + topResults[rdi] = queue.pop(); + topResults[rdi].rank = rdi + 1; + } + topDocStats.fetchHits = topResults.length; + return new SearchPhaseController.SortedTopDocs(topResults, false, null, null, null, 0); } - int size = Math.min(this.size, queue.size()); - RankDoc[] topResults = new RankDoc[size]; - for (int rdi = 0; rdi < size; ++rdi) { - topResults[rdi] = queue.pop(); - topResults[rdi].rank = rdi + 1; - } - topDocStats.fetchHits = topResults.length; - return new SearchPhaseController.SortedTopDocs(topResults, false, null, null, null, 0); - } - }, - true - ); - List shards = queryResults.asList().stream().map(SearchPhaseResult::getSearchShardTarget).collect(toList()); - AtomicArray fetchResults = generateFetchResults( - shards, - reducedQueryPhase.sortedTopDocs().scoreDocs(), - reducedQueryPhase.suggest(), - false - ); - InternalSearchResponse mergedResponse = SearchPhaseController.merge( - false, - reducedQueryPhase, - fetchResults.asList(), - fetchResults::get - ); - if (trackTotalHits == SearchContext.TRACK_TOTAL_HITS_DISABLED) { - assertNull(mergedResponse.hits.getTotalHits()); - } else { - assertThat(mergedResponse.hits.getTotalHits().value, equalTo(0L)); - assertEquals(mergedResponse.hits.getTotalHits().relation, Relation.EQUAL_TO); - } - int rank = 1; - for (SearchHit hit : mergedResponse.hits().getHits()) { - SearchPhaseResult searchPhaseResult = fetchResults.get(hit.getShard().getShardId().id()); - assertSame(searchPhaseResult.getSearchShardTarget(), hit.getShard()); - assertEquals(rank++, hit.getRank()); + }, + true + ); + List shards = queryResults.asList() + .stream() + .map(SearchPhaseResult::getSearchShardTarget) + .collect(toList()); + AtomicArray fetchResults = generateFetchResults( + shards, + reducedQueryPhase.sortedTopDocs().scoreDocs(), + reducedQueryPhase.suggest(), + false + ); + InternalSearchResponse mergedResponse = SearchPhaseController.merge( + false, + reducedQueryPhase, + fetchResults.asList(), + fetchResults::get + ); + if (trackTotalHits == SearchContext.TRACK_TOTAL_HITS_DISABLED) { + assertNull(mergedResponse.hits.getTotalHits()); + } else { + assertThat(mergedResponse.hits.getTotalHits().value, equalTo(0L)); + assertEquals(mergedResponse.hits.getTotalHits().relation, Relation.EQUAL_TO); + } + int rank = 1; + for (SearchHit hit : mergedResponse.hits().getHits()) { + SearchPhaseResult searchPhaseResult = fetchResults.get(hit.getShard().getShardId().id()); + assertSame(searchPhaseResult.getSearchShardTarget(), hit.getShard()); + assertEquals(rank++, hit.getRank()); + } + assertThat(mergedResponse.hits().getHits().length, equalTo(reducedQueryPhase.sortedTopDocs().scoreDocs().length)); + assertThat(mergedResponse.profile(), is(anEmptyMap())); + } finally { + queryResults.asList().forEach(TransportMessage::decRef); } - assertThat(mergedResponse.hits().getHits().length, equalTo(reducedQueryPhase.sortedTopDocs().scoreDocs().length)); - assertThat(mergedResponse.profile(), is(anEmptyMap())); } } @@ -602,51 +636,63 @@ private void consumerTestCase(int numEmptyResponses) throws Exception { new SearchShardTarget("node", new ShardId("a", "b", 0), null), null ); - result.topDocs( - new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), Float.NaN), - new DocValueFormat[0] - ); - InternalAggregations aggs = InternalAggregations.from(singletonList(new Max("test", 1.0D, DocValueFormat.RAW, emptyMap()))); - result.aggregations(aggs); - result.setShardIndex(0); - consumer.consumeResult(result, latch::countDown); - + try { + result.topDocs( + new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), Float.NaN), + new DocValueFormat[0] + ); + InternalAggregations aggs = InternalAggregations.from(singletonList(new Max("test", 1.0D, DocValueFormat.RAW, emptyMap()))); + result.aggregations(aggs); + result.setShardIndex(0); + consumer.consumeResult(result, latch::countDown); + } finally { + result.decRef(); + } result = new QuerySearchResult( new ShardSearchContextId("", 1), new SearchShardTarget("node", new ShardId("a", "b", 0), null), null ); - result.topDocs( - new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), Float.NaN), - new DocValueFormat[0] - ); - aggs = InternalAggregations.from(singletonList(new Max("test", 3.0D, DocValueFormat.RAW, emptyMap()))); - result.aggregations(aggs); - result.setShardIndex(2); - consumer.consumeResult(result, latch::countDown); - + try { + result.topDocs( + new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), Float.NaN), + new DocValueFormat[0] + ); + InternalAggregations aggs = InternalAggregations.from(singletonList(new Max("test", 3.0D, DocValueFormat.RAW, emptyMap()))); + result.aggregations(aggs); + result.setShardIndex(2); + consumer.consumeResult(result, latch::countDown); + } finally { + result.decRef(); + } result = new QuerySearchResult( new ShardSearchContextId("", 1), new SearchShardTarget("node", new ShardId("a", "b", 0), null), null ); - result.topDocs( - new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), Float.NaN), - new DocValueFormat[0] - ); - aggs = InternalAggregations.from(singletonList(new Max("test", 2.0D, DocValueFormat.RAW, emptyMap()))); - result.aggregations(aggs); - result.setShardIndex(1); - consumer.consumeResult(result, latch::countDown); - + try { + result.topDocs( + new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), Float.NaN), + new DocValueFormat[0] + ); + InternalAggregations aggs = InternalAggregations.from(singletonList(new Max("test", 2.0D, DocValueFormat.RAW, emptyMap()))); + result.aggregations(aggs); + result.setShardIndex(1); + consumer.consumeResult(result, latch::countDown); + } finally { + result.decRef(); + } while (numEmptyResponses > 0) { result = QuerySearchResult.nullInstance(); - int shardId = 2 + numEmptyResponses; - result.setShardIndex(shardId); - result.setSearchShardTarget(new SearchShardTarget("node", new ShardId("a", "b", shardId), null)); - consumer.consumeResult(result, latch::countDown); + try { + int shardId = 2 + numEmptyResponses; + result.setShardIndex(shardId); + result.setSearchShardTarget(new SearchShardTarget("node", new ShardId("a", "b", shardId), null)); + consumer.consumeResult(result, latch::countDown); + } finally { + result.decRef(); + } numEmptyResponses--; - } latch.await(); final int numTotalReducePhases; @@ -707,20 +753,24 @@ public void testConsumerConcurrently() throws Exception { new SearchShardTarget("node", new ShardId("a", "b", id), null), null ); - result.topDocs( - new TopDocsAndMaxScore( - new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(0, number) }), - number - ), - new DocValueFormat[0] - ); - InternalAggregations aggs = InternalAggregations.from( - Collections.singletonList(new Max("test", (double) number, DocValueFormat.RAW, Collections.emptyMap())) - ); - result.aggregations(aggs); - result.setShardIndex(id); - result.size(1); - consumer.consumeResult(result, latch::countDown); + try { + result.topDocs( + new TopDocsAndMaxScore( + new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(0, number) }), + number + ), + new DocValueFormat[0] + ); + InternalAggregations aggs = InternalAggregations.from( + Collections.singletonList(new Max("test", (double) number, DocValueFormat.RAW, Collections.emptyMap())) + ); + result.aggregations(aggs); + result.setShardIndex(id); + result.size(1); + consumer.consumeResult(result, latch::countDown); + } finally { + result.decRef(); + } }); threads[i].start(); @@ -769,17 +819,21 @@ public void testConsumerOnlyAggs() throws Exception { new SearchShardTarget("node", new ShardId("a", "b", i), null), null ); - result.topDocs( - new TopDocsAndMaxScore(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), number), - new DocValueFormat[0] - ); - InternalAggregations aggs = InternalAggregations.from( - Collections.singletonList(new Max("test", (double) number, DocValueFormat.RAW, Collections.emptyMap())) - ); - result.aggregations(aggs); - result.setShardIndex(i); - result.size(1); - consumer.consumeResult(result, latch::countDown); + try { + result.topDocs( + new TopDocsAndMaxScore(new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]), number), + new DocValueFormat[0] + ); + InternalAggregations aggs = InternalAggregations.from( + Collections.singletonList(new Max("test", (double) number, DocValueFormat.RAW, Collections.emptyMap())) + ); + result.aggregations(aggs); + result.setShardIndex(i); + result.size(1); + consumer.consumeResult(result, latch::countDown); + } finally { + result.decRef(); + } } latch.await(); @@ -823,16 +877,20 @@ public void testConsumerOnlyHits() throws Exception { new SearchShardTarget("node", new ShardId("a", "b", i), null), null ); - result.topDocs( - new TopDocsAndMaxScore( - new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(0, number) }), - number - ), - new DocValueFormat[0] - ); - result.setShardIndex(i); - result.size(1); - consumer.consumeResult(result, latch::countDown); + try { + result.topDocs( + new TopDocsAndMaxScore( + new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(0, number) }), + number + ), + new DocValueFormat[0] + ); + result.setShardIndex(i); + result.size(1); + consumer.consumeResult(result, latch::countDown); + } finally { + result.decRef(); + } } latch.await(); SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce(); @@ -880,18 +938,22 @@ public void testReduceTopNWithFromOffset() throws Exception { new SearchShardTarget("node", new ShardId("a", "b", i), null), null ); - ScoreDoc[] docs = new ScoreDoc[3]; - for (int j = 0; j < docs.length; j++) { - docs[j] = new ScoreDoc(0, score--); + try { + ScoreDoc[] docs = new ScoreDoc[3]; + for (int j = 0; j < docs.length; j++) { + docs[j] = new ScoreDoc(0, score--); + } + result.topDocs( + new TopDocsAndMaxScore(new TopDocs(new TotalHits(3, TotalHits.Relation.EQUAL_TO), docs), docs[0].score), + new DocValueFormat[0] + ); + result.setShardIndex(i); + result.size(5); + result.from(5); + consumer.consumeResult(result, latch::countDown); + } finally { + result.decRef(); } - result.topDocs( - new TopDocsAndMaxScore(new TopDocs(new TotalHits(3, TotalHits.Relation.EQUAL_TO), docs), docs[0].score), - new DocValueFormat[0] - ); - result.setShardIndex(i); - result.size(5); - result.from(5); - consumer.consumeResult(result, latch::countDown); } latch.await(); // 4*3 results = 12 we get result 5 to 10 here with from=5 and size=5 @@ -936,10 +998,14 @@ public void testConsumerSortByField() throws Exception { new SearchShardTarget("node", new ShardId("a", "b", i), null), null ); - result.topDocs(new TopDocsAndMaxScore(topDocs, Float.NaN), docValueFormats); - result.setShardIndex(i); - result.size(size); - consumer.consumeResult(result, latch::countDown); + try { + result.topDocs(new TopDocsAndMaxScore(topDocs, Float.NaN), docValueFormats); + result.setShardIndex(i); + result.size(size); + consumer.consumeResult(result, latch::countDown); + } finally { + result.decRef(); + } } latch.await(); SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce(); @@ -986,10 +1052,14 @@ public void testConsumerFieldCollapsing() throws Exception { new SearchShardTarget("node", new ShardId("a", "b", i), null), null ); - result.topDocs(new TopDocsAndMaxScore(topDocs, Float.NaN), docValueFormats); - result.setShardIndex(i); - result.size(size); - consumer.consumeResult(result, latch::countDown); + try { + result.topDocs(new TopDocsAndMaxScore(topDocs, Float.NaN), docValueFormats); + result.setShardIndex(i); + result.size(size); + consumer.consumeResult(result, latch::countDown); + } finally { + result.decRef(); + } } latch.await(); SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce(); @@ -1031,55 +1101,59 @@ public void testConsumerSuggestions() throws Exception { new SearchShardTarget("node", new ShardId("a", "b", i), null), null ); - List>> suggestions = - new ArrayList<>(); - { - TermSuggestion termSuggestion = new TermSuggestion("term", 1, SortBy.SCORE); - TermSuggestion.Entry entry = new TermSuggestion.Entry(new Text("entry"), 0, 10); - int numOptions = randomIntBetween(1, 10); - for (int j = 0; j < numOptions; j++) { - int score = numOptions - j; - maxScoreTerm = Math.max(maxScoreTerm, score); - entry.addOption(new TermSuggestion.Entry.Option(new Text("option"), randomInt(), score)); + try { + List>> suggestions = + new ArrayList<>(); + { + TermSuggestion termSuggestion = new TermSuggestion("term", 1, SortBy.SCORE); + TermSuggestion.Entry entry = new TermSuggestion.Entry(new Text("entry"), 0, 10); + int numOptions = randomIntBetween(1, 10); + for (int j = 0; j < numOptions; j++) { + int score = numOptions - j; + maxScoreTerm = Math.max(maxScoreTerm, score); + entry.addOption(new TermSuggestion.Entry.Option(new Text("option"), randomInt(), score)); + } + termSuggestion.addTerm(entry); + suggestions.add(termSuggestion); } - termSuggestion.addTerm(entry); - suggestions.add(termSuggestion); - } - { - PhraseSuggestion phraseSuggestion = new PhraseSuggestion("phrase", 1); - PhraseSuggestion.Entry entry = new PhraseSuggestion.Entry(new Text("entry"), 0, 10); - int numOptions = randomIntBetween(1, 10); - for (int j = 0; j < numOptions; j++) { - int score = numOptions - j; - maxScorePhrase = Math.max(maxScorePhrase, score); - entry.addOption(new PhraseSuggestion.Entry.Option(new Text("option"), new Text("option"), score)); + { + PhraseSuggestion phraseSuggestion = new PhraseSuggestion("phrase", 1); + PhraseSuggestion.Entry entry = new PhraseSuggestion.Entry(new Text("entry"), 0, 10); + int numOptions = randomIntBetween(1, 10); + for (int j = 0; j < numOptions; j++) { + int score = numOptions - j; + maxScorePhrase = Math.max(maxScorePhrase, score); + entry.addOption(new PhraseSuggestion.Entry.Option(new Text("option"), new Text("option"), score)); + } + phraseSuggestion.addTerm(entry); + suggestions.add(phraseSuggestion); } - phraseSuggestion.addTerm(entry); - suggestions.add(phraseSuggestion); - } - { - CompletionSuggestion completionSuggestion = new CompletionSuggestion("completion", 1, false); - CompletionSuggestion.Entry entry = new CompletionSuggestion.Entry(new Text("entry"), 0, 10); - int numOptions = randomIntBetween(1, 10); - for (int j = 0; j < numOptions; j++) { - int score = numOptions - j; - maxScoreCompletion = Math.max(maxScoreCompletion, score); - CompletionSuggestion.Entry.Option option = new CompletionSuggestion.Entry.Option( - j, - new Text("option"), - score, - Collections.emptyMap() - ); - entry.addOption(option); + { + CompletionSuggestion completionSuggestion = new CompletionSuggestion("completion", 1, false); + CompletionSuggestion.Entry entry = new CompletionSuggestion.Entry(new Text("entry"), 0, 10); + int numOptions = randomIntBetween(1, 10); + for (int j = 0; j < numOptions; j++) { + int score = numOptions - j; + maxScoreCompletion = Math.max(maxScoreCompletion, score); + CompletionSuggestion.Entry.Option option = new CompletionSuggestion.Entry.Option( + j, + new Text("option"), + score, + Collections.emptyMap() + ); + entry.addOption(option); + } + completionSuggestion.addTerm(entry); + suggestions.add(completionSuggestion); } - completionSuggestion.addTerm(entry); - suggestions.add(completionSuggestion); + result.suggest(new Suggest(suggestions)); + result.topDocs(new TopDocsAndMaxScore(Lucene.EMPTY_TOP_DOCS, Float.NaN), new DocValueFormat[0]); + result.setShardIndex(i); + result.size(0); + consumer.consumeResult(result, latch::countDown); + } finally { + result.decRef(); } - result.suggest(new Suggest(suggestions)); - result.topDocs(new TopDocsAndMaxScore(Lucene.EMPTY_TOP_DOCS, Float.NaN), new DocValueFormat[0]); - result.setShardIndex(i); - result.size(0); - consumer.consumeResult(result, latch::countDown); } latch.await(); SearchPhaseController.ReducedQueryPhase reduce = consumer.reduce(); @@ -1173,20 +1247,24 @@ public void onFinalReduce(List shards, TotalHits totalHits, Interna new SearchShardTarget("node", new ShardId("a", "b", id), null), null ); - result.topDocs( - new TopDocsAndMaxScore( - new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(0, number) }), - number - ), - new DocValueFormat[0] - ); - InternalAggregations aggs = InternalAggregations.from( - Collections.singletonList(new Max("test", (double) number, DocValueFormat.RAW, Collections.emptyMap())) - ); - result.aggregations(aggs); - result.setShardIndex(id); - result.size(1); - consumer.consumeResult(result, latch::countDown); + try { + result.topDocs( + new TopDocsAndMaxScore( + new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(0, number) }), + number + ), + new DocValueFormat[0] + ); + InternalAggregations aggs = InternalAggregations.from( + Collections.singletonList(new Max("test", (double) number, DocValueFormat.RAW, Collections.emptyMap())) + ); + result.aggregations(aggs); + result.setShardIndex(id); + result.size(1); + consumer.consumeResult(result, latch::countDown); + } finally { + result.decRef(); + } }); threads[i].start(); } @@ -1253,17 +1331,24 @@ private void testReduceCase(int numShards, int bufferSize, boolean shouldFail) t new SearchShardTarget("node", new ShardId("a", "b", index), null), null ); - result.topDocs( - new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), Lucene.EMPTY_SCORE_DOCS), Float.NaN), - new DocValueFormat[0] - ); - InternalAggregations aggs = InternalAggregations.from( - Collections.singletonList(new Max("test", 0d, DocValueFormat.RAW, Collections.emptyMap())) - ); - result.aggregations(aggs); - result.setShardIndex(index); - result.size(1); - consumer.consumeResult(result, latch::countDown); + try { + result.topDocs( + new TopDocsAndMaxScore( + new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), Lucene.EMPTY_SCORE_DOCS), + Float.NaN + ), + new DocValueFormat[0] + ); + InternalAggregations aggs = InternalAggregations.from( + Collections.singletonList(new Max("test", 0d, DocValueFormat.RAW, Collections.emptyMap())) + ); + result.aggregations(aggs); + result.setShardIndex(index); + result.size(1); + consumer.consumeResult(result, latch::countDown); + } finally { + result.decRef(); + } }); threads[index].start(); } @@ -1314,14 +1399,21 @@ public void testFailConsumeAggs() throws Exception { new SearchShardTarget("node", new ShardId("a", "b", index), null), null ); - result.topDocs( - new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), Lucene.EMPTY_SCORE_DOCS), Float.NaN), - new DocValueFormat[0] - ); - result.aggregations(null); - result.setShardIndex(index); - result.size(1); - expectThrows(Exception.class, () -> consumer.consumeResult(result, () -> {})); + try { + result.topDocs( + new TopDocsAndMaxScore( + new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), Lucene.EMPTY_SCORE_DOCS), + Float.NaN + ), + new DocValueFormat[0] + ); + result.aggregations(null); + result.setShardIndex(index); + result.size(1); + expectThrows(Exception.class, () -> consumer.consumeResult(result, () -> {})); + } finally { + result.decRef(); + } } assertNull(consumer.reduce().aggregations()); } diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java index 6d5380273c8c8..7270326933dea 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java @@ -118,38 +118,55 @@ public void sendExecuteQuery( new SearchShardTarget("node1", new ShardId("idx", "na", shardId), null), null ); - SortField sortField = new SortField("timestamp", SortField.Type.LONG); - if (withCollapse) { - queryResult.topDocs( - new TopDocsAndMaxScore( - new TopFieldGroups( - "collapse_field", - new TotalHits(1, withScroll ? TotalHits.Relation.EQUAL_TO : TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), - new FieldDoc[] { new FieldDoc(randomInt(1000), Float.NaN, new Object[] { request.shardId().id() }) }, - new SortField[] { sortField }, - new Object[] { 0L } + try { + SortField sortField = new SortField("timestamp", SortField.Type.LONG); + if (withCollapse) { + queryResult.topDocs( + new TopDocsAndMaxScore( + new TopFieldGroups( + "collapse_field", + new TotalHits( + 1, + withScroll ? TotalHits.Relation.EQUAL_TO : TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO + ), + new FieldDoc[] { new FieldDoc(randomInt(1000), Float.NaN, new Object[] { request.shardId().id() }) }, + new SortField[] { sortField }, + new Object[] { 0L } + ), + Float.NaN ), - Float.NaN - ), - new DocValueFormat[] { DocValueFormat.RAW } - ); - } else { - queryResult.topDocs( - new TopDocsAndMaxScore( - new TopFieldDocs( - new TotalHits(1, withScroll ? TotalHits.Relation.EQUAL_TO : TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), - new FieldDoc[] { new FieldDoc(randomInt(1000), Float.NaN, new Object[] { request.shardId().id() }) }, - new SortField[] { sortField } + new DocValueFormat[] { DocValueFormat.RAW } + ); + } else { + queryResult.topDocs( + new TopDocsAndMaxScore( + new TopFieldDocs( + new TotalHits( + 1, + withScroll ? TotalHits.Relation.EQUAL_TO : TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO + ), + new FieldDoc[] { new FieldDoc(randomInt(1000), Float.NaN, new Object[] { request.shardId().id() }) }, + new SortField[] { sortField } + ), + Float.NaN ), - Float.NaN - ), - new DocValueFormat[] { DocValueFormat.RAW } - ); + new DocValueFormat[] { DocValueFormat.RAW } + ); + } + queryResult.from(0); + queryResult.size(1); + successfulOps.incrementAndGet(); + queryResult.incRef(); + new Thread(() -> { + try { + listener.onResponse(queryResult); + } finally { + queryResult.decRef(); + } + }).start(); + } finally { + queryResult.decRef(); } - queryResult.from(0); - queryResult.size(1); - successfulOps.incrementAndGet(); - new Thread(() -> listener.onResponse(queryResult)).start(); } }; CountDownLatch latch = new CountDownLatch(1); diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index d4ebf1f44182a..8f0444287d07e 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -382,16 +382,24 @@ public void onFailure(Exception e) { new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()), result ); - SearchPhaseResult searchPhaseResult = result.get(); - List intCursors = new ArrayList<>(1); - intCursors.add(0); - ShardFetchRequest req = new ShardFetchRequest(searchPhaseResult.getContextId(), intCursors, null/* not a scroll */); - PlainActionFuture listener = new PlainActionFuture<>(); - service.executeFetchPhase(req, new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()), listener); - listener.get(); - if (useScroll) { - // have to free context since this test does not remove the index from IndicesService. - service.freeReaderContext(searchPhaseResult.getContextId()); + final SearchPhaseResult searchPhaseResult = result.get(); + try { + List intCursors = new ArrayList<>(1); + intCursors.add(0); + ShardFetchRequest req = new ShardFetchRequest( + searchPhaseResult.getContextId(), + intCursors, + null/* not a scroll */ + ); + PlainActionFuture listener = new PlainActionFuture<>(); + service.executeFetchPhase(req, new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()), listener); + listener.get().decRef(); + if (useScroll) { + // have to free context since this test does not remove the index from IndicesService. + service.freeReaderContext(searchPhaseResult.getContextId()); + } + } finally { + searchPhaseResult.decRef(); } } catch (ExecutionException ex) { assertThat(ex.getCause(), instanceOf(RuntimeException.class)); @@ -1046,6 +1054,7 @@ public void onResponse(SearchPhaseResult searchPhaseResult) { // make sure that the wrapper is called when the query is actually executed assertEquals(6, numWrapInvocations.get()); } finally { + searchPhaseResult.decRef(); latch.countDown(); } } @@ -1360,6 +1369,7 @@ public void onResponse(SearchPhaseResult result) { assertNotNull(result.queryResult().topDocs()); assertNotNull(result.queryResult().aggregations()); } finally { + result.decRef(); latch.countDown(); } } @@ -1390,6 +1400,7 @@ public void onResponse(SearchPhaseResult result) { assertNotNull(result.queryResult().topDocs()); assertNotNull(result.queryResult().aggregations()); } finally { + result.decRef(); latch.countDown(); } } @@ -1418,6 +1429,7 @@ public void onResponse(SearchPhaseResult result) { assertThat(result, instanceOf(QuerySearchResult.class)); assertTrue(result.queryResult().isNull()); } finally { + result.decRef(); latch.countDown(); } } @@ -1558,6 +1570,7 @@ public void testCancelQueryPhaseEarly() throws Exception { @Override public void onResponse(SearchPhaseResult searchPhaseResult) { service.freeReaderContext(searchPhaseResult.getContextId()); + searchPhaseResult.decRef(); latch1.countDown(); } @@ -1602,6 +1615,7 @@ public void onResponse(SearchPhaseResult searchPhaseResult) { fail("Search not cancelled early"); } finally { service.freeReaderContext(searchPhaseResult.getContextId()); + searchPhaseResult.decRef(); latch3.countDown(); } } @@ -1728,7 +1742,11 @@ public void testWaitOnRefresh() { ); service.executeQueryPhase(request, task, future); SearchPhaseResult searchPhaseResult = future.actionGet(); - assertEquals(1, searchPhaseResult.queryResult().getTotalHits().value); + try { + assertEquals(1, searchPhaseResult.queryResult().getTotalHits().value); + } finally { + searchPhaseResult.decRef(); + } } public void testWaitOnRefreshFailsWithRefreshesDisabled() { @@ -1907,7 +1925,7 @@ public void testDfsQueryPhaseRewrite() { plainActionFuture ); - plainActionFuture.actionGet(); + plainActionFuture.actionGet().decRef(); assertThat(((TestRewriteCounterQueryBuilder) request.source().query()).asyncRewriteCount, equalTo(1)); final ShardSearchContextId contextId = context.id(); assertTrue(service.freeReaderContext(contextId)); @@ -2068,114 +2086,37 @@ public void testSlicingBehaviourForParallelCollection() throws Exception { try (ReaderContext readerContext = createReaderContext(indexService, indexShard)) { SearchShardTask task = new SearchShardTask(0, "type", "action", "description", null, emptyMap()); { - SearchContext searchContext = service.createContext(readerContext, request, task, ResultsType.DFS, true); - ContextIndexSearcher searcher = searchContext.searcher(); - assertNotNull(searcher.getExecutor()); - - final int maxPoolSize = executor.getMaximumPoolSize(); - assertEquals( - "Sanity check to ensure this isn't the default of 1 when pool size is unset", - configuredMaxPoolSize, - maxPoolSize - ); + try (SearchContext searchContext = service.createContext(readerContext, request, task, ResultsType.DFS, true)) { + ContextIndexSearcher searcher = searchContext.searcher(); + assertNotNull(searcher.getExecutor()); - final int expectedSlices = ContextIndexSearcher.computeSlices(searcher.getIndexReader().leaves(), maxPoolSize, 1).length; - assertNotEquals("Sanity check to ensure this isn't the default of 1 when pool size is unset", 1, expectedSlices); - - final long priorExecutorTaskCount = executor.getCompletedTaskCount(); - searcher.search(termQuery, new TotalHitCountCollectorManager()); - assertBusy( - () -> assertEquals( - "DFS supports parallel collection, so the number of slices should be > 1.", - expectedSlices, - executor.getCompletedTaskCount() - priorExecutorTaskCount - ) - ); - } - { - SearchContext searchContext = service.createContext(readerContext, request, task, ResultsType.QUERY, true); - ContextIndexSearcher searcher = searchContext.searcher(); - assertNotNull(searcher.getExecutor()); - - final int maxPoolSize = executor.getMaximumPoolSize(); - assertEquals( - "Sanity check to ensure this isn't the default of 1 when pool size is unset", - configuredMaxPoolSize, - maxPoolSize - ); + final int maxPoolSize = executor.getMaximumPoolSize(); + assertEquals( + "Sanity check to ensure this isn't the default of 1 when pool size is unset", + configuredMaxPoolSize, + maxPoolSize + ); - final int expectedSlices = ContextIndexSearcher.computeSlices(searcher.getIndexReader().leaves(), maxPoolSize, 1).length; - assertNotEquals("Sanity check to ensure this isn't the default of 1 when pool size is unset", 1, expectedSlices); - - final long priorExecutorTaskCount = executor.getCompletedTaskCount(); - searcher.search(termQuery, new TotalHitCountCollectorManager()); - assertBusy( - () -> assertEquals( - "QUERY supports parallel collection when enabled, so the number of slices should be > 1.", - expectedSlices, - executor.getCompletedTaskCount() - priorExecutorTaskCount - ) - ); - } - { - SearchContext searchContext = service.createContext(readerContext, request, task, ResultsType.FETCH, true); - ContextIndexSearcher searcher = searchContext.searcher(); - assertNotNull(searcher.getExecutor()); - final long priorExecutorTaskCount = executor.getCompletedTaskCount(); - searcher.search(termQuery, new TotalHitCountCollectorManager()); - assertBusy( - () -> assertEquals( - "The number of slices should be 1 as FETCH does not support parallel collection.", - 1, - executor.getCompletedTaskCount() - priorExecutorTaskCount - ) - ); - } - { - SearchContext searchContext = service.createContext(readerContext, request, task, ResultsType.NONE, true); - ContextIndexSearcher searcher = searchContext.searcher(); - assertNotNull(searcher.getExecutor()); - final long priorExecutorTaskCount = executor.getCompletedTaskCount(); - searcher.search(termQuery, new TotalHitCountCollectorManager()); - assertBusy( - () -> assertEquals( - "The number of slices should be 1 as NONE does not support parallel collection.", - 1, - executor.getCompletedTaskCount() - priorExecutorTaskCount - ) - ); - } + final int expectedSlices = ContextIndexSearcher.computeSlices( + searcher.getIndexReader().leaves(), + maxPoolSize, + 1 + ).length; + assertNotEquals("Sanity check to ensure this isn't the default of 1 when pool size is unset", 1, expectedSlices); - try { - ClusterUpdateSettingsResponse response = client().admin() - .cluster() - .prepareUpdateSettings() - .setPersistentSettings(Settings.builder().put(QUERY_PHASE_PARALLEL_COLLECTION_ENABLED.getKey(), false).build()) - .get(); - assertTrue(response.isAcknowledged()); - { - SearchContext searchContext = service.createContext(readerContext, request, task, ResultsType.QUERY, true); - ContextIndexSearcher searcher = searchContext.searcher(); - assertNotNull(searcher.getExecutor()); final long priorExecutorTaskCount = executor.getCompletedTaskCount(); searcher.search(termQuery, new TotalHitCountCollectorManager()); assertBusy( () -> assertEquals( - "The number of slices should be 1 when QUERY parallel collection is disabled.", - 1, + "DFS supports parallel collection, so the number of slices should be > 1.", + expectedSlices, executor.getCompletedTaskCount() - priorExecutorTaskCount ) ); } - } finally { - // Reset to the original default setting and check to ensure it takes effect. - client().admin() - .cluster() - .prepareUpdateSettings() - .setPersistentSettings(Settings.builder().putNull(QUERY_PHASE_PARALLEL_COLLECTION_ENABLED.getKey()).build()) - .get(); - { - SearchContext searchContext = service.createContext(readerContext, request, task, ResultsType.QUERY, true); + } + { + try (SearchContext searchContext = service.createContext(readerContext, request, task, ResultsType.QUERY, true)) { ContextIndexSearcher searcher = searchContext.searcher(); assertNotNull(searcher.getExecutor()); @@ -2204,6 +2145,97 @@ public void testSlicingBehaviourForParallelCollection() throws Exception { ); } } + { + try (SearchContext searchContext = service.createContext(readerContext, request, task, ResultsType.FETCH, true)) { + ContextIndexSearcher searcher = searchContext.searcher(); + assertNotNull(searcher.getExecutor()); + final long priorExecutorTaskCount = executor.getCompletedTaskCount(); + searcher.search(termQuery, new TotalHitCountCollectorManager()); + assertBusy( + () -> assertEquals( + "The number of slices should be 1 as FETCH does not support parallel collection.", + 1, + executor.getCompletedTaskCount() - priorExecutorTaskCount + ) + ); + } + } + { + try (SearchContext searchContext = service.createContext(readerContext, request, task, ResultsType.NONE, true)) { + ContextIndexSearcher searcher = searchContext.searcher(); + assertNotNull(searcher.getExecutor()); + final long priorExecutorTaskCount = executor.getCompletedTaskCount(); + searcher.search(termQuery, new TotalHitCountCollectorManager()); + assertBusy( + () -> assertEquals( + "The number of slices should be 1 as NONE does not support parallel collection.", + 1, + executor.getCompletedTaskCount() - priorExecutorTaskCount + ) + ); + } + } + + try { + ClusterUpdateSettingsResponse response = client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(QUERY_PHASE_PARALLEL_COLLECTION_ENABLED.getKey(), false).build()) + .get(); + assertTrue(response.isAcknowledged()); + { + try (SearchContext searchContext = service.createContext(readerContext, request, task, ResultsType.QUERY, true)) { + ContextIndexSearcher searcher = searchContext.searcher(); + assertNotNull(searcher.getExecutor()); + final long priorExecutorTaskCount = executor.getCompletedTaskCount(); + searcher.search(termQuery, new TotalHitCountCollectorManager()); + assertBusy( + () -> assertEquals( + "The number of slices should be 1 when QUERY parallel collection is disabled.", + 1, + executor.getCompletedTaskCount() - priorExecutorTaskCount + ) + ); + } + } + } finally { + // Reset to the original default setting and check to ensure it takes effect. + client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().putNull(QUERY_PHASE_PARALLEL_COLLECTION_ENABLED.getKey()).build()) + .get(); + { + try (SearchContext searchContext = service.createContext(readerContext, request, task, ResultsType.QUERY, true)) { + ContextIndexSearcher searcher = searchContext.searcher(); + assertNotNull(searcher.getExecutor()); + + final int maxPoolSize = executor.getMaximumPoolSize(); + assertEquals( + "Sanity check to ensure this isn't the default of 1 when pool size is unset", + configuredMaxPoolSize, + maxPoolSize + ); + + final int expectedSlices = ContextIndexSearcher.computeSlices( + searcher.getIndexReader().leaves(), + maxPoolSize, + 1 + ).length; + assertNotEquals("Sanity check to ensure this isn't the default of 1 when pool size is unset", 1, expectedSlices); + + final long priorExecutorTaskCount = executor.getCompletedTaskCount(); + searcher.search(termQuery, new TotalHitCountCollectorManager()); + assertBusy( + () -> assertEquals( + "QUERY supports parallel collection when enabled, so the number of slices should be > 1.", + expectedSlices, + executor.getCompletedTaskCount() - priorExecutorTaskCount + ) + ); + } + } + } } } diff --git a/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java b/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java index 9569bd982363e..065a8bb22ab68 100644 --- a/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java +++ b/server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java @@ -1065,7 +1065,7 @@ public T search(Query query, CollectorManager col } }; - SearchContext context = new TestSearchContext(null, indexShard, searcher) { + try (SearchContext context = new TestSearchContext(null, indexShard, searcher) { @Override public Query buildFilteredQuery(Query query) { return query; @@ -1075,37 +1075,38 @@ public Query buildFilteredQuery(Query query) { public ReaderContext readerContext() { return new ReaderContext(new ShardSearchContextId("test", 1L), null, indexShard, null, 0L, false); } - }; + }) { - List queries = List.of(new TermQuery(new Term("field0", "term")), new TermQuery(new Term("field1", "term0"))); - context.parsedQuery( - new ParsedQuery(new BooleanQuery.Builder().add(queries.get(0), Occur.SHOULD).add(queries.get(1), Occur.SHOULD).build()) - ); - context.rankShardContext(new RankShardContext(queries, 0, 100) { - @Override - public RankShardResult combine(List rankResults) { - return null; - } - }); - - context.trackTotalHitsUpTo(SearchContext.TRACK_TOTAL_HITS_DISABLED); - context.aggregations(null); - QueryPhase.executeRank(context); - assertEquals(queries, executed); - - executed.clear(); - context.trackTotalHitsUpTo(100); - context.aggregations(null); - QueryPhase.executeRank(context); - assertEquals(context.rewrittenQuery(), executed.get(0)); - assertEquals(queries, executed.subList(1, executed.size())); - - executed.clear(); - context.trackTotalHitsUpTo(SearchContext.TRACK_TOTAL_HITS_DISABLED); - context.aggregations(new SearchContextAggregations(AggregatorFactories.EMPTY, () -> null)); - QueryPhase.executeRank(context); - assertEquals(context.rewrittenQuery(), executed.get(0)); - assertEquals(queries, executed.subList(1, executed.size())); + List queries = List.of(new TermQuery(new Term("field0", "term")), new TermQuery(new Term("field1", "term0"))); + context.parsedQuery( + new ParsedQuery(new BooleanQuery.Builder().add(queries.get(0), Occur.SHOULD).add(queries.get(1), Occur.SHOULD).build()) + ); + context.rankShardContext(new RankShardContext(queries, 0, 100) { + @Override + public RankShardResult combine(List rankResults) { + return null; + } + }); + + context.trackTotalHitsUpTo(SearchContext.TRACK_TOTAL_HITS_DISABLED); + context.aggregations(null); + QueryPhase.executeRank(context); + assertEquals(queries, executed); + + executed.clear(); + context.trackTotalHitsUpTo(100); + context.aggregations(null); + QueryPhase.executeRank(context); + assertEquals(context.rewrittenQuery(), executed.get(0)); + assertEquals(queries, executed.subList(1, executed.size())); + + executed.clear(); + context.trackTotalHitsUpTo(SearchContext.TRACK_TOTAL_HITS_DISABLED); + context.aggregations(new SearchContextAggregations(AggregatorFactories.EMPTY, () -> null)); + QueryPhase.executeRank(context); + assertEquals(context.rewrittenQuery(), executed.get(0)); + assertEquals(queries, executed.subList(1, executed.size())); + } } private static final QueryCachingPolicy NEVER_CACHE_POLICY = new QueryCachingPolicy() { diff --git a/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java b/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java index c728bed5ed7bb..516ffeb9418bd 100644 --- a/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java +++ b/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java @@ -97,28 +97,36 @@ private static QuerySearchResult createTestInstance() throws Exception { public void testSerialization() throws Exception { QuerySearchResult querySearchResult = createTestInstance(); - boolean delayed = randomBoolean(); - QuerySearchResult deserialized = copyWriteable( - querySearchResult, - namedWriteableRegistry, - delayed ? in -> new QuerySearchResult(in, true) : QuerySearchResult::new, - TransportVersion.current() - ); - assertEquals(querySearchResult.getContextId().getId(), deserialized.getContextId().getId()); - assertNull(deserialized.getSearchShardTarget()); - assertEquals(querySearchResult.topDocs().maxScore, deserialized.topDocs().maxScore, 0f); - assertEquals(querySearchResult.topDocs().topDocs.totalHits, deserialized.topDocs().topDocs.totalHits); - assertEquals(querySearchResult.from(), deserialized.from()); - assertEquals(querySearchResult.size(), deserialized.size()); - assertEquals(querySearchResult.hasAggs(), deserialized.hasAggs()); - if (deserialized.hasAggs()) { - assertThat(deserialized.aggregations().isSerialized(), is(delayed)); - Aggregations aggs = querySearchResult.consumeAggs(); - Aggregations deserializedAggs = deserialized.consumeAggs(); - assertEquals(aggs.asList(), deserializedAggs.asList()); - assertThat(deserialized.aggregations(), is(nullValue())); + try { + boolean delayed = randomBoolean(); + QuerySearchResult deserialized = copyWriteable( + querySearchResult, + namedWriteableRegistry, + delayed ? in -> new QuerySearchResult(in, true) : QuerySearchResult::new, + TransportVersion.current() + ); + try { + assertEquals(querySearchResult.getContextId().getId(), deserialized.getContextId().getId()); + assertNull(deserialized.getSearchShardTarget()); + assertEquals(querySearchResult.topDocs().maxScore, deserialized.topDocs().maxScore, 0f); + assertEquals(querySearchResult.topDocs().topDocs.totalHits, deserialized.topDocs().topDocs.totalHits); + assertEquals(querySearchResult.from(), deserialized.from()); + assertEquals(querySearchResult.size(), deserialized.size()); + assertEquals(querySearchResult.hasAggs(), deserialized.hasAggs()); + if (deserialized.hasAggs()) { + assertThat(deserialized.aggregations().isSerialized(), is(delayed)); + Aggregations aggs = querySearchResult.consumeAggs(); + Aggregations deserializedAggs = deserialized.consumeAggs(); + assertEquals(aggs.asList(), deserializedAggs.asList()); + assertThat(deserialized.aggregations(), is(nullValue())); + } + assertEquals(querySearchResult.terminatedEarly(), deserialized.terminatedEarly()); + } finally { + deserialized.decRef(); + } + } finally { + querySearchResult.decRef(); } - assertEquals(querySearchResult.terminatedEarly(), deserialized.terminatedEarly()); } public void testNullResponse() throws Exception { diff --git a/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java b/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java index ebc5ca4cd0fd3..c1c4d70e0b906 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java +++ b/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java @@ -132,7 +132,12 @@ protected SearchContext createContext( boolean includeAggregations ) throws IOException { SearchContext searchContext = super.createContext(readerContext, request, task, resultsType, includeAggregations); - onCreateSearchContext.accept(searchContext); + try { + onCreateSearchContext.accept(searchContext); + } catch (Exception e) { + searchContext.close(); + throw e; + } return searchContext; }