Skip to content

Commit

Permalink
Fix SearchResponse leaks in searchable snapshot test and production c…
Browse files Browse the repository at this point in the history
…ode (#103118)

It's in the title. Fix all the tests like we have everywhere else.
Fix the production use of SearchResponse in
BlobStoreCacheMaintenanceService by properly ref-counting the response
it holds on to.
  • Loading branch information
original-brownbear authored Dec 10, 2023
1 parent 80b222c commit 64818df
Show file tree
Hide file tree
Showing 11 changed files with 270 additions and 265 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,22 @@
*/
package org.elasticsearch.search;

import org.apache.lucene.search.TotalHits;
import org.elasticsearch.action.search.SearchRequestBuilder;

public enum SearchResponseUtils {
;

public static long getTotalHitsValue(SearchRequestBuilder request) {
public static TotalHits getTotalHits(SearchRequestBuilder request) {
var resp = request.get();
try {
return resp.getHits().getTotalHits().value;
return resp.getHits().getTotalHits();
} finally {
resp.decRef();
}
}

public static long getTotalHitsValue(SearchRequestBuilder request) {
return getTotalHits(request).value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import static org.elasticsearch.license.LicenseSettings.SELF_GENERATED_LICENSE_TYPE;
import static org.elasticsearch.test.NodeRoles.addRoles;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
import static org.elasticsearch.xpack.searchablesnapshots.cache.common.TestUtils.pageAligned;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
Expand Down Expand Up @@ -289,10 +290,10 @@ protected void assertTotalHits(String indexName, TotalHits originalAllHits, Tota
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
allHits.set(t, prepareSearch(indexName).setTrackTotalHits(true).get().getHits().getTotalHits());
barHits.set(
t,
prepareSearch(indexName).setTrackTotalHits(true).setQuery(matchQuery("foo", "bar")).get().getHits().getTotalHits()
assertResponse(prepareSearch(indexName).setTrackTotalHits(true), resp -> allHits.set(t, resp.getHits().getTotalHits()));
assertResponse(
prepareSearch(indexName).setTrackTotalHits(true).setQuery(matchQuery("foo", "bar")),
resp -> barHits.set(t, resp.getHits().getTotalHits())
);
});
threads[i].start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.ClosePointInTimeRequest;
import org.elasticsearch.action.search.OpenPointInTimeRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.search.TransportClosePointInTimeAction;
import org.elasticsearch.action.search.TransportOpenPointInTimeAction;
Expand All @@ -32,7 +31,7 @@

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailuresAndResponse;
import static org.hamcrest.Matchers.equalTo;

public class RetrySearchIntegTests extends BaseSearchableSnapshotsIntegTestCase {
Expand Down Expand Up @@ -144,30 +143,31 @@ public void testRetryPointInTime() throws Exception {
).keepAlive(TimeValue.timeValueMinutes(2));
final String pitId = client().execute(TransportOpenPointInTimeAction.TYPE, openRequest).actionGet().getPointInTimeId();
try {
SearchResponse resp = prepareSearch().setIndices(indexName)
.setPreference(null)
.setPointInTime(new PointInTimeBuilder(pitId))
.get();
assertNoFailures(resp);
assertThat(resp.pointInTimeId(), equalTo(pitId));
assertHitCount(resp, docCount);

assertNoFailuresAndResponse(
prepareSearch().setIndices(indexName).setPreference(null).setPointInTime(new PointInTimeBuilder(pitId)),
resp -> {
assertThat(resp.pointInTimeId(), equalTo(pitId));
assertHitCount(resp, docCount);
}
);
final Set<String> allocatedNodes = internalCluster().nodesInclude(indexName);
for (String allocatedNode : allocatedNodes) {
internalCluster().restartNode(allocatedNode);
}
ensureGreen(indexName);
resp = prepareSearch().setIndices(indexName)
.setQuery(new RangeQueryBuilder("created_date").gte("2011-01-01").lte("2011-12-12"))
.setSearchType(SearchType.QUERY_THEN_FETCH)
.setPreference(null)
.setPreFilterShardSize(between(1, 10))
.setAllowPartialSearchResults(true)
.setPointInTime(new PointInTimeBuilder(pitId))
.get();
assertNoFailures(resp);
assertThat(resp.pointInTimeId(), equalTo(pitId));
assertHitCount(resp, docCount);
assertNoFailuresAndResponse(
prepareSearch().setIndices(indexName)
.setQuery(new RangeQueryBuilder("created_date").gte("2011-01-01").lte("2011-12-12"))
.setSearchType(SearchType.QUERY_THEN_FETCH)
.setPreference(null)
.setPreFilterShardSize(between(1, 10))
.setAllowPartialSearchResults(true)
.setPointInTime(new PointInTimeBuilder(pitId)),
resp -> {
assertThat(resp.pointInTimeId(), equalTo(pitId));
assertHitCount(resp, docCount);
}
);
} finally {
client().execute(TransportClosePointInTimeAction.TYPE, new ClosePointInTimeRequest(pitId)).actionGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING;
import static org.elasticsearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
Expand Down Expand Up @@ -182,14 +183,14 @@ public void testSearchableSnapshotShardsAreSkippedBySearchRequestWithoutQuerying
.source(new SearchSourceBuilder().query(rangeQuery));

if (includeIndexCoveringSearchRangeInSearchRequest) {
SearchResponse searchResponse = client().search(request).actionGet();

// All the regular index searches succeeded
assertThat(searchResponse.getSuccessfulShards(), equalTo(indexWithinSearchRangeShardCount));
// All the searchable snapshots shard search failed
assertThat(searchResponse.getFailedShards(), equalTo(indexOutsideSearchRangeShardCount));
assertThat(searchResponse.getSkippedShards(), equalTo(0));
assertThat(searchResponse.getTotalShards(), equalTo(totalShards));
assertResponse(client().search(request), searchResponse -> {
// All the regular index searches succeeded
assertThat(searchResponse.getSuccessfulShards(), equalTo(indexWithinSearchRangeShardCount));
// All the searchable snapshots shard search failed
assertThat(searchResponse.getFailedShards(), equalTo(indexOutsideSearchRangeShardCount));
assertThat(searchResponse.getSkippedShards(), equalTo(0));
assertThat(searchResponse.getTotalShards(), equalTo(totalShards));
});
} else {
// All shards failed, since all shards are unassigned and the IndexMetadata min/max timestamp
// is not available yet
Expand Down Expand Up @@ -271,13 +272,13 @@ public void testSearchableSnapshotShardsAreSkippedBySearchRequestWithoutQuerying
waitUntilAllShardsAreUnassigned(updatedIndexMetadata.getIndex());

if (includeIndexCoveringSearchRangeInSearchRequest) {
SearchResponse newSearchResponse = client().search(request).actionGet();

assertThat(newSearchResponse.getSkippedShards(), equalTo(indexOutsideSearchRangeShardCount));
assertThat(newSearchResponse.getSuccessfulShards(), equalTo(totalShards));
assertThat(newSearchResponse.getFailedShards(), equalTo(0));
assertThat(newSearchResponse.getTotalShards(), equalTo(totalShards));
assertThat(newSearchResponse.getHits().getTotalHits().value, equalTo((long) numDocsWithinRange));
assertResponse(client().search(request), newSearchResponse -> {
assertThat(newSearchResponse.getSkippedShards(), equalTo(indexOutsideSearchRangeShardCount));
assertThat(newSearchResponse.getSuccessfulShards(), equalTo(totalShards));
assertThat(newSearchResponse.getFailedShards(), equalTo(0));
assertThat(newSearchResponse.getTotalShards(), equalTo(totalShards));
assertThat(newSearchResponse.getHits().getTotalHits().value, equalTo((long) numDocsWithinRange));
});

// test with SearchShardsAPI
{
Expand Down Expand Up @@ -338,13 +339,14 @@ public void testSearchableSnapshotShardsAreSkippedBySearchRequestWithoutQuerying
}
}
} else {
SearchResponse newSearchResponse = client().search(request).actionGet();
// When all shards are skipped, at least one of them should be queried in order to
// provide a proper search response.
assertThat(newSearchResponse.getSkippedShards(), equalTo(indexOutsideSearchRangeShardCount - 1));
assertThat(newSearchResponse.getSuccessfulShards(), equalTo(indexOutsideSearchRangeShardCount - 1));
assertThat(newSearchResponse.getFailedShards(), equalTo(1));
assertThat(newSearchResponse.getTotalShards(), equalTo(indexOutsideSearchRangeShardCount));
assertResponse(client().search(request), newSearchResponse -> {
// When all shards are skipped, at least one of them should be queried in order to
// provide a proper search response.
assertThat(newSearchResponse.getSkippedShards(), equalTo(indexOutsideSearchRangeShardCount - 1));
assertThat(newSearchResponse.getSuccessfulShards(), equalTo(indexOutsideSearchRangeShardCount - 1));
assertThat(newSearchResponse.getFailedShards(), equalTo(1));
assertThat(newSearchResponse.getTotalShards(), equalTo(indexOutsideSearchRangeShardCount));
});

// test with SearchShardsAPI
{
Expand Down Expand Up @@ -449,14 +451,15 @@ public void testQueryPhaseIsExecutedInAnAvailableNodeWhenAllShardsCanBeSkipped()

// test with Search API
{
SearchResponse searchResponse = client().search(request).actionGet();
// All the regular index searches succeeded
assertThat(searchResponse.getSuccessfulShards(), equalTo(indexOutsideSearchRangeShardCount));
// All the searchable snapshots shard search failed
assertThat(searchResponse.getFailedShards(), equalTo(indexOutsideSearchRangeShardCount));
assertThat(searchResponse.getSkippedShards(), equalTo(searchableSnapshotShardCount));
assertThat(searchResponse.getTotalShards(), equalTo(totalShards));
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(0L));
assertResponse(client().search(request), searchResponse -> {
// All the regular index searches succeeded
assertThat(searchResponse.getSuccessfulShards(), equalTo(indexOutsideSearchRangeShardCount));
// All the searchable snapshots shard search failed
assertThat(searchResponse.getFailedShards(), equalTo(indexOutsideSearchRangeShardCount));
assertThat(searchResponse.getSkippedShards(), equalTo(searchableSnapshotShardCount));
assertThat(searchResponse.getTotalShards(), equalTo(totalShards));
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(0L));
});
}

// test with SearchShards API
Expand Down Expand Up @@ -513,16 +516,16 @@ public void testQueryPhaseIsExecutedInAnAvailableNodeWhenAllShardsCanBeSkipped()
// busy assert since computing the time stamp field from the cluster state happens off of the CS applier thread and thus can be
// slightly delayed
assertBusy(() -> {
SearchResponse newSearchResponse = client().search(request).actionGet();

// All the regular index searches succeeded
assertThat(newSearchResponse.getSuccessfulShards(), equalTo(totalShards));
assertThat(newSearchResponse.getFailedShards(), equalTo(0));
// We have to query at least one node to construct a valid response, and we pick
// a shard that's available in order to construct the search response
assertThat(newSearchResponse.getSkippedShards(), equalTo(totalShards - 1));
assertThat(newSearchResponse.getTotalShards(), equalTo(totalShards));
assertThat(newSearchResponse.getHits().getTotalHits().value, equalTo(0L));
assertResponse(client().search(request), newSearchResponse -> {
// All the regular index searches succeeded
assertThat(newSearchResponse.getSuccessfulShards(), equalTo(totalShards));
assertThat(newSearchResponse.getFailedShards(), equalTo(0));
// We have to query at least one node to construct a valid response, and we pick
// a shard that's available in order to construct the search response
assertThat(newSearchResponse.getSkippedShards(), equalTo(totalShards - 1));
assertThat(newSearchResponse.getTotalShards(), equalTo(totalShards));
assertThat(newSearchResponse.getHits().getTotalHits().value, equalTo(0L));
});
});

// test with SearchShards API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchResponseUtils;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotsService;
Expand Down Expand Up @@ -117,19 +118,12 @@ public void testCreateAndRestoreSearchableSnapshot() throws Exception {

populateIndex(indexName, 10_000);

final TotalHits originalAllHits = internalCluster().client()
.prepareSearch(indexName)
.setTrackTotalHits(true)
.get()
.getHits()
.getTotalHits();
final TotalHits originalBarHits = internalCluster().client()
.prepareSearch(indexName)
.setTrackTotalHits(true)
.setQuery(matchQuery("foo", "bar"))
.get()
.getHits()
.getTotalHits();
final TotalHits originalAllHits = SearchResponseUtils.getTotalHits(
internalCluster().client().prepareSearch(indexName).setTrackTotalHits(true)
);
final TotalHits originalBarHits = SearchResponseUtils.getTotalHits(
internalCluster().client().prepareSearch(indexName).setTrackTotalHits(true).setQuery(matchQuery("foo", "bar"))
);
logger.info("--> [{}] in total, of which [{}] match the query", originalAllHits, originalBarHits);

expectThrows(
Expand Down Expand Up @@ -765,19 +759,12 @@ public void testSnapshotOfSearchableSnapshotIncludesNoDataButCanBeRestored() thr
Settings.builder().put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1).put(INDEX_SOFT_DELETES_SETTING.getKey(), true)
);

final TotalHits originalAllHits = internalCluster().client()
.prepareSearch(indexName)
.setTrackTotalHits(true)
.get()
.getHits()
.getTotalHits();
final TotalHits originalBarHits = internalCluster().client()
.prepareSearch(indexName)
.setTrackTotalHits(true)
.setQuery(matchQuery("foo", "bar"))
.get()
.getHits()
.getTotalHits();
final TotalHits originalAllHits = SearchResponseUtils.getTotalHits(
internalCluster().client().prepareSearch(indexName).setTrackTotalHits(true)
);
final TotalHits originalBarHits = SearchResponseUtils.getTotalHits(
internalCluster().client().prepareSearch(indexName).setTrackTotalHits(true).setQuery(matchQuery("foo", "bar"))
);
logger.info("--> [{}] in total, of which [{}] match the query", originalAllHits, originalBarHits);

// The repository that contains the actual data
Expand Down Expand Up @@ -936,19 +923,12 @@ public void testSnapshotOfSearchableSnapshotCanBeRestoredBeforeRepositoryRegiste
Settings.builder().put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1).put(INDEX_SOFT_DELETES_SETTING.getKey(), true)
);

final TotalHits originalAllHits = internalCluster().client()
.prepareSearch(indexName)
.setTrackTotalHits(true)
.get()
.getHits()
.getTotalHits();
final TotalHits originalBarHits = internalCluster().client()
.prepareSearch(indexName)
.setTrackTotalHits(true)
.setQuery(matchQuery("foo", "bar"))
.get()
.getHits()
.getTotalHits();
final TotalHits originalAllHits = SearchResponseUtils.getTotalHits(
internalCluster().client().prepareSearch(indexName).setTrackTotalHits(true)
);
final TotalHits originalBarHits = SearchResponseUtils.getTotalHits(
internalCluster().client().prepareSearch(indexName).setTrackTotalHits(true).setQuery(matchQuery("foo", "bar"))
);
logger.info("--> [{}] in total, of which [{}] match the query", originalAllHits, originalBarHits);

// Take snapshot containing the actual data to one repository
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.indices.recovery.plan.ShardSnapshotsService;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.search.SearchResponseUtils;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest;

Expand Down Expand Up @@ -43,12 +44,9 @@ public void testSearchableSnapshotRelocationDoNotUseSnapshotBasedRecoveries() th
.put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
);

final TotalHits totalHits = internalCluster().client()
.prepareSearch(indexName)
.setTrackTotalHits(true)
.get()
.getHits()
.getTotalHits();
final TotalHits totalHits = SearchResponseUtils.getTotalHits(
internalCluster().client().prepareSearch(indexName).setTrackTotalHits(true)
);

final var snapshotName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createSnapshot(repositoryName, snapshotName, List.of(indexName));
Expand Down
Loading

0 comments on commit 64818df

Please sign in to comment.