diff --git a/docs/changelog/108562.yaml b/docs/changelog/108562.yaml
new file mode 100644
index 0000000000000..2a0047fe807fd
--- /dev/null
+++ b/docs/changelog/108562.yaml
@@ -0,0 +1,6 @@
+pr: 108562
+summary: Add `internalClusterTest` for and fix leak in `ExpandSearchPhase`
+area: Search
+type: bug
+issues:
+ - 108369
diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/CollapseSearchResultsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/CollapseSearchResultsIT.java
new file mode 100644
index 0000000000000..a12a26d69c5ff
--- /dev/null
+++ b/server/src/internalClusterTest/java/org/elasticsearch/search/CollapseSearchResultsIT.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.search;
+
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.index.query.InnerHitBuilder;
+import org.elasticsearch.index.query.MatchAllQueryBuilder;
+import org.elasticsearch.search.collapse.CollapseBuilder;
+import org.elasticsearch.test.ESIntegTestCase;
+
+import java.util.Map;
+import java.util.Set;
+
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailuresAndResponse;
+
+public class CollapseSearchResultsIT extends ESIntegTestCase {
+
+ public void testCollapse() {
+ final String indexName = "test_collapse";
+ createIndex(indexName);
+ final String collapseField = "collapse_field";
+ assertAcked(indicesAdmin().preparePutMapping(indexName).setSource(collapseField, "type=keyword"));
+ index(indexName, "id_1", Map.of(collapseField, "value1"));
+ index(indexName, "id_2", Map.of(collapseField, "value2"));
+ refresh(indexName);
+ assertNoFailuresAndResponse(
+ prepareSearch(indexName).setQuery(new MatchAllQueryBuilder())
+ .setCollapse(new CollapseBuilder(collapseField).setInnerHits(new InnerHitBuilder("ih").setSize(2))),
+ searchResponse -> {
+ assertEquals(collapseField, searchResponse.getHits().getCollapseField());
+ assertEquals(Set.of(new BytesRef("value1"), new BytesRef("value2")), Set.of(searchResponse.getHits().getCollapseValues()));
+ }
+ );
+ }
+}
diff --git a/server/src/main/java/org/elasticsearch/action/search/ExpandSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/ExpandSearchPhase.java
index 149cdb9206b34..e8470ba77632f 100644
--- a/server/src/main/java/org/elasticsearch/action/search/ExpandSearchPhase.java
+++ b/server/src/main/java/org/elasticsearch/action/search/ExpandSearchPhase.java
@@ -44,66 +44,69 @@ final class ExpandSearchPhase extends SearchPhase {
* Returns true
iff the search request has inner hits and needs field collapsing
*/
private boolean isCollapseRequest() {
- final SearchRequest searchRequest = context.getRequest();
- return searchRequest.source() != null
- && searchRequest.source().collapse() != null
- && searchRequest.source().collapse().getInnerHits().isEmpty() == false;
+ final var searchSource = context.getRequest().source();
+ return searchSource != null && searchSource.collapse() != null && searchSource.collapse().getInnerHits().isEmpty() == false;
}
@Override
public void run() {
- if (isCollapseRequest() && searchHits.getHits().length > 0) {
- SearchRequest searchRequest = context.getRequest();
- CollapseBuilder collapseBuilder = searchRequest.source().collapse();
- final List innerHitBuilders = collapseBuilder.getInnerHits();
- MultiSearchRequest multiRequest = new MultiSearchRequest();
- if (collapseBuilder.getMaxConcurrentGroupRequests() > 0) {
- multiRequest.maxConcurrentSearchRequests(collapseBuilder.getMaxConcurrentGroupRequests());
+ if (isCollapseRequest() == false || searchHits.getHits().length == 0) {
+ onPhaseDone();
+ } else {
+ doRun();
+ }
+ }
+
+ private void doRun() {
+ SearchRequest searchRequest = context.getRequest();
+ CollapseBuilder collapseBuilder = searchRequest.source().collapse();
+ final List innerHitBuilders = collapseBuilder.getInnerHits();
+ MultiSearchRequest multiRequest = new MultiSearchRequest();
+ if (collapseBuilder.getMaxConcurrentGroupRequests() > 0) {
+ multiRequest.maxConcurrentSearchRequests(collapseBuilder.getMaxConcurrentGroupRequests());
+ }
+ for (SearchHit hit : searchHits.getHits()) {
+ BoolQueryBuilder groupQuery = new BoolQueryBuilder();
+ Object collapseValue = hit.field(collapseBuilder.getField()).getValue();
+ if (collapseValue != null) {
+ groupQuery.filter(QueryBuilders.matchQuery(collapseBuilder.getField(), collapseValue));
+ } else {
+ groupQuery.mustNot(QueryBuilders.existsQuery(collapseBuilder.getField()));
+ }
+ QueryBuilder origQuery = searchRequest.source().query();
+ if (origQuery != null) {
+ groupQuery.must(origQuery);
+ }
+ for (InnerHitBuilder innerHitBuilder : innerHitBuilders) {
+ CollapseBuilder innerCollapseBuilder = innerHitBuilder.getInnerCollapseBuilder();
+ SearchSourceBuilder sourceBuilder = buildExpandSearchSourceBuilder(innerHitBuilder, innerCollapseBuilder).query(groupQuery)
+ .postFilter(searchRequest.source().postFilter())
+ .runtimeMappings(searchRequest.source().runtimeMappings());
+ SearchRequest groupRequest = new SearchRequest(searchRequest);
+ groupRequest.source(sourceBuilder);
+ multiRequest.add(groupRequest);
}
+ }
+ context.getSearchTransport().sendExecuteMultiSearch(multiRequest, context.getTask(), ActionListener.wrap(response -> {
+ Iterator it = response.iterator();
for (SearchHit hit : searchHits.getHits()) {
- BoolQueryBuilder groupQuery = new BoolQueryBuilder();
- Object collapseValue = hit.field(collapseBuilder.getField()).getValue();
- if (collapseValue != null) {
- groupQuery.filter(QueryBuilders.matchQuery(collapseBuilder.getField(), collapseValue));
- } else {
- groupQuery.mustNot(QueryBuilders.existsQuery(collapseBuilder.getField()));
- }
- QueryBuilder origQuery = searchRequest.source().query();
- if (origQuery != null) {
- groupQuery.must(origQuery);
- }
for (InnerHitBuilder innerHitBuilder : innerHitBuilders) {
- CollapseBuilder innerCollapseBuilder = innerHitBuilder.getInnerCollapseBuilder();
- SearchSourceBuilder sourceBuilder = buildExpandSearchSourceBuilder(innerHitBuilder, innerCollapseBuilder).query(
- groupQuery
- ).postFilter(searchRequest.source().postFilter()).runtimeMappings(searchRequest.source().runtimeMappings());
- SearchRequest groupRequest = new SearchRequest(searchRequest);
- groupRequest.source(sourceBuilder);
- multiRequest.add(groupRequest);
- }
- }
- context.getSearchTransport().sendExecuteMultiSearch(multiRequest, context.getTask(), ActionListener.wrap(response -> {
- Iterator it = response.iterator();
- for (SearchHit hit : searchHits.getHits()) {
- for (InnerHitBuilder innerHitBuilder : innerHitBuilders) {
- MultiSearchResponse.Item item = it.next();
- if (item.isFailure()) {
- context.onPhaseFailure(this, "failed to expand hits", item.getFailure());
- return;
- }
- SearchHits innerHits = item.getResponse().getHits();
- if (hit.getInnerHits() == null) {
- hit.setInnerHits(Maps.newMapWithExpectedSize(innerHitBuilders.size()));
- }
- hit.getInnerHits().put(innerHitBuilder.getName(), innerHits);
- innerHits.mustIncRef();
+ MultiSearchResponse.Item item = it.next();
+ if (item.isFailure()) {
+ context.onPhaseFailure(this, "failed to expand hits", item.getFailure());
+ return;
}
+ SearchHits innerHits = item.getResponse().getHits();
+ if (hit.getInnerHits() == null) {
+ hit.setInnerHits(Maps.newMapWithExpectedSize(innerHitBuilders.size()));
+ }
+ hit.getInnerHits().put(innerHitBuilder.getName(), innerHits);
+ assert innerHits.isPooled() == false || hit.isPooled() : "pooled inner hits can only be added to a pooled hit";
+ innerHits.mustIncRef();
}
- onPhaseDone();
- }, context::onFailure));
- } else {
+ }
onPhaseDone();
- }
+ }, context::onFailure));
}
private static SearchSourceBuilder buildExpandSearchSourceBuilder(InnerHitBuilder options, CollapseBuilder innerCollapseBuilder) {
diff --git a/server/src/main/java/org/elasticsearch/search/SearchHits.java b/server/src/main/java/org/elasticsearch/search/SearchHits.java
index ce8ccf4b7f0e6..d559fc60fa72d 100644
--- a/server/src/main/java/org/elasticsearch/search/SearchHits.java
+++ b/server/src/main/java/org/elasticsearch/search/SearchHits.java
@@ -132,24 +132,31 @@ public static SearchHits readFrom(StreamInput in, boolean pooled) throws IOExcep
final float maxScore = in.readFloat();
int size = in.readVInt();
final SearchHit[] hits;
+ boolean isPooled = false;
if (size == 0) {
hits = EMPTY;
} else {
hits = new SearchHit[size];
for (int i = 0; i < hits.length; i++) {
- hits[i] = SearchHit.readFrom(in, pooled);
+ var hit = SearchHit.readFrom(in, pooled);
+ hits[i] = hit;
+ isPooled = isPooled || hit.isPooled();
}
}
var sortFields = in.readOptionalArray(Lucene::readSortField, SortField[]::new);
var collapseField = in.readOptionalString();
var collapseValues = in.readOptionalArray(Lucene::readSortValue, Object[]::new);
- if (pooled) {
+ if (isPooled) {
return new SearchHits(hits, totalHits, maxScore, sortFields, collapseField, collapseValues);
} else {
return unpooled(hits, totalHits, maxScore, sortFields, collapseField, collapseValues);
}
}
+ public boolean isPooled() {
+ return refCounted != ALWAYS_REFERENCED;
+ }
+
@Override
public void writeTo(StreamOutput out) throws IOException {
assert hasReferences();
diff --git a/server/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java b/server/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java
index 2fa3e903a0074..03c54113f6ff0 100644
--- a/server/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java
+++ b/server/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java
@@ -163,23 +163,35 @@ protected SearchHit nextDoc(int doc) throws IOException {
leafSourceLoader,
leafIdLoader
);
- sourceProvider.source = hit.source();
- fieldLookupProvider.storedFields = hit.loadedFields();
- for (FetchSubPhaseProcessor processor : processors) {
- processor.process(hit);
+ boolean success = false;
+ try {
+ sourceProvider.source = hit.source();
+ fieldLookupProvider.storedFields = hit.loadedFields();
+ for (FetchSubPhaseProcessor processor : processors) {
+ processor.process(hit);
+ }
+ success = true;
+ return hit.hit();
+ } finally {
+ if (success == false) {
+ hit.hit().decRef();
+ }
}
- return hit.hit();
}
};
SearchHit[] hits = docsIterator.iterate(context.shardTarget(), context.searcher().getIndexReader(), docIdsToLoad);
if (context.isCancelled()) {
+ for (SearchHit hit : hits) {
+ // release all hits that would otherwise become owned and eventually released by SearchHits below
+ hit.decRef();
+ }
throw new TaskCancelledException("cancelled");
}
TotalHits totalHits = context.getTotalHits();
- return SearchHits.unpooled(hits, totalHits, context.getMaxScore());
+ return new SearchHits(hits, totalHits, context.getMaxScore());
}
List getProcessors(SearchShardTarget target, FetchContext context, Profiler profiler) {
@@ -253,12 +265,12 @@ private static HitContext prepareNonNestedHitContext(
String id = idLoader.getId(subDocId);
if (id == null) {
- // TODO: can we use pooled buffers here as well?
- SearchHit hit = SearchHit.unpooled(docId, null);
+ SearchHit hit = new SearchHit(docId);
+ // TODO: can we use real pooled buffers here as well?
Source source = Source.lazy(lazyStoredSourceLoader(profiler, subReaderContext, subDocId));
return new HitContext(hit, subReaderContext, subDocId, Map.of(), source);
} else {
- SearchHit hit = SearchHit.unpooled(docId, id);
+ SearchHit hit = new SearchHit(docId, id);
Source source;
if (requiresSource) {
Timer timer = profiler.startLoadingSource();
@@ -335,7 +347,7 @@ private static HitContext prepareNestedHitContext(
assert nestedIdentity != null;
Source nestedSource = nestedIdentity.extractSource(rootSource);
- SearchHit hit = SearchHit.unpooled(topDocId, rootId, nestedIdentity);
+ SearchHit hit = new SearchHit(topDocId, rootId, nestedIdentity);
return new HitContext(hit, subReaderContext, nestedInfo.doc(), childFieldLoader.storedFields(), nestedSource);
}
diff --git a/server/src/main/java/org/elasticsearch/search/fetch/FetchPhaseDocsIterator.java b/server/src/main/java/org/elasticsearch/search/fetch/FetchPhaseDocsIterator.java
index cc39113f2009f..81b3e7465feee 100644
--- a/server/src/main/java/org/elasticsearch/search/fetch/FetchPhaseDocsIterator.java
+++ b/server/src/main/java/org/elasticsearch/search/fetch/FetchPhaseDocsIterator.java
@@ -67,6 +67,7 @@ public final SearchHit[] iterate(SearchShardTarget shardTarget, IndexReader inde
setNextReader(ctx, docsInLeaf);
}
currentDoc = docs[i].docId;
+ assert searchHits[docs[i].index] == null;
searchHits[docs[i].index] = nextDoc(docs[i].docId);
}
} catch (Exception e) {
diff --git a/server/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java b/server/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java
index 4c3d3948ff889..4170f7e2f8b4b 100644
--- a/server/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java
+++ b/server/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java
@@ -61,8 +61,13 @@ public FetchSearchResult fetchResult() {
public void shardResult(SearchHits hits, ProfileResult profileResult) {
assert assertNoSearchTarget(hits);
+ assert hasReferences();
+ var existing = this.hits;
+ if (existing != null) {
+ existing.decRef();
+ }
this.hits = hits;
- hits.incRef();
+ hits.mustIncRef();
assert this.profileResult == null;
this.profileResult = profileResult;
}
diff --git a/server/src/main/java/org/elasticsearch/search/fetch/subphase/InnerHitsPhase.java b/server/src/main/java/org/elasticsearch/search/fetch/subphase/InnerHitsPhase.java
index ccb54801472a6..a4ba982e1dd73 100644
--- a/server/src/main/java/org/elasticsearch/search/fetch/subphase/InnerHitsPhase.java
+++ b/server/src/main/java/org/elasticsearch/search/fetch/subphase/InnerHitsPhase.java
@@ -104,6 +104,7 @@ private void hitExecute(Map innerHi
}
}
var h = fetchResult.hits();
+ assert hit.isPooled() || h.isPooled() == false;
results.put(entry.getKey(), h);
h.mustIncRef();
}