Skip to content

Commit

Permalink
Add internalClusterTest for and fix leak in ExpandSearchPhase (#108562)…
Browse files Browse the repository at this point in the history
… (#108629)

`ExpandSearchPhase` was leaking `SearchHits` when a pooled `SearchHits`
that was read from the wire was added to an unpooled `SearchHit`.
This commit makes the relevant `SearchHit` instances that need to be
pooled so they released nested hits, pooled. This requires a couple of
smaller adjustments in the codebase, mainly around error handling.
  • Loading branch information
original-brownbear authored May 14, 2024
1 parent ef7130a commit c309c20
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 63 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/108562.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 108562
summary: Add `internalClusterTest` for and fix leak in `ExpandSearchPhase`
area: Search
type: bug
issues:
- 108369
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.search;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.index.query.InnerHitBuilder;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.search.collapse.CollapseBuilder;
import org.elasticsearch.test.ESIntegTestCase;

import java.util.Map;
import java.util.Set;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailuresAndResponse;

public class CollapseSearchResultsIT extends ESIntegTestCase {

public void testCollapse() {
final String indexName = "test_collapse";
createIndex(indexName);
final String collapseField = "collapse_field";
assertAcked(indicesAdmin().preparePutMapping(indexName).setSource(collapseField, "type=keyword"));
index(indexName, "id_1", Map.of(collapseField, "value1"));
index(indexName, "id_2", Map.of(collapseField, "value2"));
refresh(indexName);
assertNoFailuresAndResponse(
prepareSearch(indexName).setQuery(new MatchAllQueryBuilder())
.setCollapse(new CollapseBuilder(collapseField).setInnerHits(new InnerHitBuilder("ih").setSize(2))),
searchResponse -> {
assertEquals(collapseField, searchResponse.getHits().getCollapseField());
assertEquals(Set.of(new BytesRef("value1"), new BytesRef("value2")), Set.of(searchResponse.getHits().getCollapseValues()));
}
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,66 +44,69 @@ final class ExpandSearchPhase extends SearchPhase {
* Returns <code>true</code> iff the search request has inner hits and needs field collapsing
*/
private boolean isCollapseRequest() {
final SearchRequest searchRequest = context.getRequest();
return searchRequest.source() != null
&& searchRequest.source().collapse() != null
&& searchRequest.source().collapse().getInnerHits().isEmpty() == false;
final var searchSource = context.getRequest().source();
return searchSource != null && searchSource.collapse() != null && searchSource.collapse().getInnerHits().isEmpty() == false;
}

@Override
public void run() {
if (isCollapseRequest() && searchHits.getHits().length > 0) {
SearchRequest searchRequest = context.getRequest();
CollapseBuilder collapseBuilder = searchRequest.source().collapse();
final List<InnerHitBuilder> innerHitBuilders = collapseBuilder.getInnerHits();
MultiSearchRequest multiRequest = new MultiSearchRequest();
if (collapseBuilder.getMaxConcurrentGroupRequests() > 0) {
multiRequest.maxConcurrentSearchRequests(collapseBuilder.getMaxConcurrentGroupRequests());
if (isCollapseRequest() == false || searchHits.getHits().length == 0) {
onPhaseDone();
} else {
doRun();
}
}

private void doRun() {
SearchRequest searchRequest = context.getRequest();
CollapseBuilder collapseBuilder = searchRequest.source().collapse();
final List<InnerHitBuilder> innerHitBuilders = collapseBuilder.getInnerHits();
MultiSearchRequest multiRequest = new MultiSearchRequest();
if (collapseBuilder.getMaxConcurrentGroupRequests() > 0) {
multiRequest.maxConcurrentSearchRequests(collapseBuilder.getMaxConcurrentGroupRequests());
}
for (SearchHit hit : searchHits.getHits()) {
BoolQueryBuilder groupQuery = new BoolQueryBuilder();
Object collapseValue = hit.field(collapseBuilder.getField()).getValue();
if (collapseValue != null) {
groupQuery.filter(QueryBuilders.matchQuery(collapseBuilder.getField(), collapseValue));
} else {
groupQuery.mustNot(QueryBuilders.existsQuery(collapseBuilder.getField()));
}
QueryBuilder origQuery = searchRequest.source().query();
if (origQuery != null) {
groupQuery.must(origQuery);
}
for (InnerHitBuilder innerHitBuilder : innerHitBuilders) {
CollapseBuilder innerCollapseBuilder = innerHitBuilder.getInnerCollapseBuilder();
SearchSourceBuilder sourceBuilder = buildExpandSearchSourceBuilder(innerHitBuilder, innerCollapseBuilder).query(groupQuery)
.postFilter(searchRequest.source().postFilter())
.runtimeMappings(searchRequest.source().runtimeMappings());
SearchRequest groupRequest = new SearchRequest(searchRequest);
groupRequest.source(sourceBuilder);
multiRequest.add(groupRequest);
}
}
context.getSearchTransport().sendExecuteMultiSearch(multiRequest, context.getTask(), ActionListener.wrap(response -> {
Iterator<MultiSearchResponse.Item> it = response.iterator();
for (SearchHit hit : searchHits.getHits()) {
BoolQueryBuilder groupQuery = new BoolQueryBuilder();
Object collapseValue = hit.field(collapseBuilder.getField()).getValue();
if (collapseValue != null) {
groupQuery.filter(QueryBuilders.matchQuery(collapseBuilder.getField(), collapseValue));
} else {
groupQuery.mustNot(QueryBuilders.existsQuery(collapseBuilder.getField()));
}
QueryBuilder origQuery = searchRequest.source().query();
if (origQuery != null) {
groupQuery.must(origQuery);
}
for (InnerHitBuilder innerHitBuilder : innerHitBuilders) {
CollapseBuilder innerCollapseBuilder = innerHitBuilder.getInnerCollapseBuilder();
SearchSourceBuilder sourceBuilder = buildExpandSearchSourceBuilder(innerHitBuilder, innerCollapseBuilder).query(
groupQuery
).postFilter(searchRequest.source().postFilter()).runtimeMappings(searchRequest.source().runtimeMappings());
SearchRequest groupRequest = new SearchRequest(searchRequest);
groupRequest.source(sourceBuilder);
multiRequest.add(groupRequest);
}
}
context.getSearchTransport().sendExecuteMultiSearch(multiRequest, context.getTask(), ActionListener.wrap(response -> {
Iterator<MultiSearchResponse.Item> it = response.iterator();
for (SearchHit hit : searchHits.getHits()) {
for (InnerHitBuilder innerHitBuilder : innerHitBuilders) {
MultiSearchResponse.Item item = it.next();
if (item.isFailure()) {
context.onPhaseFailure(this, "failed to expand hits", item.getFailure());
return;
}
SearchHits innerHits = item.getResponse().getHits();
if (hit.getInnerHits() == null) {
hit.setInnerHits(Maps.newMapWithExpectedSize(innerHitBuilders.size()));
}
hit.getInnerHits().put(innerHitBuilder.getName(), innerHits);
innerHits.mustIncRef();
MultiSearchResponse.Item item = it.next();
if (item.isFailure()) {
context.onPhaseFailure(this, "failed to expand hits", item.getFailure());
return;
}
SearchHits innerHits = item.getResponse().getHits();
if (hit.getInnerHits() == null) {
hit.setInnerHits(Maps.newMapWithExpectedSize(innerHitBuilders.size()));
}
hit.getInnerHits().put(innerHitBuilder.getName(), innerHits);
assert innerHits.isPooled() == false || hit.isPooled() : "pooled inner hits can only be added to a pooled hit";
innerHits.mustIncRef();
}
onPhaseDone();
}, context::onFailure));
} else {
}
onPhaseDone();
}
}, context::onFailure));
}

private static SearchSourceBuilder buildExpandSearchSourceBuilder(InnerHitBuilder options, CollapseBuilder innerCollapseBuilder) {
Expand Down
11 changes: 9 additions & 2 deletions server/src/main/java/org/elasticsearch/search/SearchHits.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,24 +132,31 @@ public static SearchHits readFrom(StreamInput in, boolean pooled) throws IOExcep
final float maxScore = in.readFloat();
int size = in.readVInt();
final SearchHit[] hits;
boolean isPooled = false;
if (size == 0) {
hits = EMPTY;
} else {
hits = new SearchHit[size];
for (int i = 0; i < hits.length; i++) {
hits[i] = SearchHit.readFrom(in, pooled);
var hit = SearchHit.readFrom(in, pooled);
hits[i] = hit;
isPooled = isPooled || hit.isPooled();
}
}
var sortFields = in.readOptionalArray(Lucene::readSortField, SortField[]::new);
var collapseField = in.readOptionalString();
var collapseValues = in.readOptionalArray(Lucene::readSortValue, Object[]::new);
if (pooled) {
if (isPooled) {
return new SearchHits(hits, totalHits, maxScore, sortFields, collapseField, collapseValues);
} else {
return unpooled(hits, totalHits, maxScore, sortFields, collapseField, collapseValues);
}
}

public boolean isPooled() {
return refCounted != ALWAYS_REFERENCED;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
assert hasReferences();
Expand Down
32 changes: 22 additions & 10 deletions server/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,23 +163,35 @@ protected SearchHit nextDoc(int doc) throws IOException {
leafSourceLoader,
leafIdLoader
);
sourceProvider.source = hit.source();
fieldLookupProvider.storedFields = hit.loadedFields();
for (FetchSubPhaseProcessor processor : processors) {
processor.process(hit);
boolean success = false;
try {
sourceProvider.source = hit.source();
fieldLookupProvider.storedFields = hit.loadedFields();
for (FetchSubPhaseProcessor processor : processors) {
processor.process(hit);
}
success = true;
return hit.hit();
} finally {
if (success == false) {
hit.hit().decRef();
}
}
return hit.hit();
}
};

SearchHit[] hits = docsIterator.iterate(context.shardTarget(), context.searcher().getIndexReader(), docIdsToLoad);

if (context.isCancelled()) {
for (SearchHit hit : hits) {
// release all hits that would otherwise become owned and eventually released by SearchHits below
hit.decRef();
}
throw new TaskCancelledException("cancelled");
}

TotalHits totalHits = context.getTotalHits();
return SearchHits.unpooled(hits, totalHits, context.getMaxScore());
return new SearchHits(hits, totalHits, context.getMaxScore());
}

List<FetchSubPhaseProcessor> getProcessors(SearchShardTarget target, FetchContext context, Profiler profiler) {
Expand Down Expand Up @@ -253,12 +265,12 @@ private static HitContext prepareNonNestedHitContext(

String id = idLoader.getId(subDocId);
if (id == null) {
// TODO: can we use pooled buffers here as well?
SearchHit hit = SearchHit.unpooled(docId, null);
SearchHit hit = new SearchHit(docId);
// TODO: can we use real pooled buffers here as well?
Source source = Source.lazy(lazyStoredSourceLoader(profiler, subReaderContext, subDocId));
return new HitContext(hit, subReaderContext, subDocId, Map.of(), source);
} else {
SearchHit hit = SearchHit.unpooled(docId, id);
SearchHit hit = new SearchHit(docId, id);
Source source;
if (requiresSource) {
Timer timer = profiler.startLoadingSource();
Expand Down Expand Up @@ -335,7 +347,7 @@ private static HitContext prepareNestedHitContext(
assert nestedIdentity != null;
Source nestedSource = nestedIdentity.extractSource(rootSource);

SearchHit hit = SearchHit.unpooled(topDocId, rootId, nestedIdentity);
SearchHit hit = new SearchHit(topDocId, rootId, nestedIdentity);
return new HitContext(hit, subReaderContext, nestedInfo.doc(), childFieldLoader.storedFields(), nestedSource);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public final SearchHit[] iterate(SearchShardTarget shardTarget, IndexReader inde
setNextReader(ctx, docsInLeaf);
}
currentDoc = docs[i].docId;
assert searchHits[docs[i].index] == null;
searchHits[docs[i].index] = nextDoc(docs[i].docId);
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,13 @@ public FetchSearchResult fetchResult() {

public void shardResult(SearchHits hits, ProfileResult profileResult) {
assert assertNoSearchTarget(hits);
assert hasReferences();
var existing = this.hits;
if (existing != null) {
existing.decRef();
}
this.hits = hits;
hits.incRef();
hits.mustIncRef();
assert this.profileResult == null;
this.profileResult = profileResult;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ private void hitExecute(Map<String, InnerHitsContext.InnerHitSubContext> innerHi
}
}
var h = fetchResult.hits();
assert hit.isPooled() || h.isPooled() == false;
results.put(entry.getKey(), h);
h.mustIncRef();
}
Expand Down

0 comments on commit c309c20

Please sign in to comment.