Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix SearchResponse leaks in searchable snapshot test and production code #103118

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,22 @@
*/
package org.elasticsearch.search;

import org.apache.lucene.search.TotalHits;
import org.elasticsearch.action.search.SearchRequestBuilder;

public enum SearchResponseUtils {
;

public static long getTotalHitsValue(SearchRequestBuilder request) {
public static TotalHits getTotalHits(SearchRequestBuilder request) {
var resp = request.get();
try {
return resp.getHits().getTotalHits().value;
return resp.getHits().getTotalHits();
} finally {
resp.decRef();
}
}

public static long getTotalHitsValue(SearchRequestBuilder request) {
return getTotalHits(request).value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import static org.elasticsearch.license.LicenseSettings.SELF_GENERATED_LICENSE_TYPE;
import static org.elasticsearch.test.NodeRoles.addRoles;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
import static org.elasticsearch.xpack.searchablesnapshots.cache.common.TestUtils.pageAligned;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
Expand Down Expand Up @@ -289,10 +290,10 @@ protected void assertTotalHits(String indexName, TotalHits originalAllHits, Tota
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
allHits.set(t, prepareSearch(indexName).setTrackTotalHits(true).get().getHits().getTotalHits());
barHits.set(
t,
prepareSearch(indexName).setTrackTotalHits(true).setQuery(matchQuery("foo", "bar")).get().getHits().getTotalHits()
assertResponse(prepareSearch(indexName).setTrackTotalHits(true), resp -> allHits.set(t, resp.getHits().getTotalHits()));
assertResponse(
prepareSearch(indexName).setTrackTotalHits(true).setQuery(matchQuery("foo", "bar")),
resp -> barHits.set(t, resp.getHits().getTotalHits())
);
});
threads[i].start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.ClosePointInTimeRequest;
import org.elasticsearch.action.search.OpenPointInTimeRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.search.TransportClosePointInTimeAction;
import org.elasticsearch.action.search.TransportOpenPointInTimeAction;
Expand All @@ -32,7 +31,7 @@

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailuresAndResponse;
import static org.hamcrest.Matchers.equalTo;

public class RetrySearchIntegTests extends BaseSearchableSnapshotsIntegTestCase {
Expand Down Expand Up @@ -144,30 +143,31 @@ public void testRetryPointInTime() throws Exception {
).keepAlive(TimeValue.timeValueMinutes(2));
final String pitId = client().execute(TransportOpenPointInTimeAction.TYPE, openRequest).actionGet().getPointInTimeId();
try {
SearchResponse resp = prepareSearch().setIndices(indexName)
.setPreference(null)
.setPointInTime(new PointInTimeBuilder(pitId))
.get();
assertNoFailures(resp);
assertThat(resp.pointInTimeId(), equalTo(pitId));
assertHitCount(resp, docCount);

assertNoFailuresAndResponse(
prepareSearch().setIndices(indexName).setPreference(null).setPointInTime(new PointInTimeBuilder(pitId)),
resp -> {
assertThat(resp.pointInTimeId(), equalTo(pitId));
assertHitCount(resp, docCount);
}
);
final Set<String> allocatedNodes = internalCluster().nodesInclude(indexName);
for (String allocatedNode : allocatedNodes) {
internalCluster().restartNode(allocatedNode);
}
ensureGreen(indexName);
resp = prepareSearch().setIndices(indexName)
.setQuery(new RangeQueryBuilder("created_date").gte("2011-01-01").lte("2011-12-12"))
.setSearchType(SearchType.QUERY_THEN_FETCH)
.setPreference(null)
.setPreFilterShardSize(between(1, 10))
.setAllowPartialSearchResults(true)
.setPointInTime(new PointInTimeBuilder(pitId))
.get();
assertNoFailures(resp);
assertThat(resp.pointInTimeId(), equalTo(pitId));
assertHitCount(resp, docCount);
assertNoFailuresAndResponse(
prepareSearch().setIndices(indexName)
.setQuery(new RangeQueryBuilder("created_date").gte("2011-01-01").lte("2011-12-12"))
.setSearchType(SearchType.QUERY_THEN_FETCH)
.setPreference(null)
.setPreFilterShardSize(between(1, 10))
.setAllowPartialSearchResults(true)
.setPointInTime(new PointInTimeBuilder(pitId)),
resp -> {
assertThat(resp.pointInTimeId(), equalTo(pitId));
assertHitCount(resp, docCount);
}
);
} finally {
client().execute(TransportClosePointInTimeAction.TYPE, new ClosePointInTimeRequest(pitId)).actionGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING;
import static org.elasticsearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
Expand Down Expand Up @@ -182,14 +183,14 @@ public void testSearchableSnapshotShardsAreSkippedBySearchRequestWithoutQuerying
.source(new SearchSourceBuilder().query(rangeQuery));

if (includeIndexCoveringSearchRangeInSearchRequest) {
SearchResponse searchResponse = client().search(request).actionGet();

// All the regular index searches succeeded
assertThat(searchResponse.getSuccessfulShards(), equalTo(indexWithinSearchRangeShardCount));
// All the searchable snapshots shard search failed
assertThat(searchResponse.getFailedShards(), equalTo(indexOutsideSearchRangeShardCount));
assertThat(searchResponse.getSkippedShards(), equalTo(0));
assertThat(searchResponse.getTotalShards(), equalTo(totalShards));
assertResponse(client().search(request), searchResponse -> {
// All the regular index searches succeeded
assertThat(searchResponse.getSuccessfulShards(), equalTo(indexWithinSearchRangeShardCount));
// All the searchable snapshots shard search failed
assertThat(searchResponse.getFailedShards(), equalTo(indexOutsideSearchRangeShardCount));
assertThat(searchResponse.getSkippedShards(), equalTo(0));
assertThat(searchResponse.getTotalShards(), equalTo(totalShards));
});
} else {
// All shards failed, since all shards are unassigned and the IndexMetadata min/max timestamp
// is not available yet
Expand Down Expand Up @@ -271,13 +272,13 @@ public void testSearchableSnapshotShardsAreSkippedBySearchRequestWithoutQuerying
waitUntilAllShardsAreUnassigned(updatedIndexMetadata.getIndex());

if (includeIndexCoveringSearchRangeInSearchRequest) {
SearchResponse newSearchResponse = client().search(request).actionGet();

assertThat(newSearchResponse.getSkippedShards(), equalTo(indexOutsideSearchRangeShardCount));
assertThat(newSearchResponse.getSuccessfulShards(), equalTo(totalShards));
assertThat(newSearchResponse.getFailedShards(), equalTo(0));
assertThat(newSearchResponse.getTotalShards(), equalTo(totalShards));
assertThat(newSearchResponse.getHits().getTotalHits().value, equalTo((long) numDocsWithinRange));
assertResponse(client().search(request), newSearchResponse -> {
assertThat(newSearchResponse.getSkippedShards(), equalTo(indexOutsideSearchRangeShardCount));
assertThat(newSearchResponse.getSuccessfulShards(), equalTo(totalShards));
assertThat(newSearchResponse.getFailedShards(), equalTo(0));
assertThat(newSearchResponse.getTotalShards(), equalTo(totalShards));
assertThat(newSearchResponse.getHits().getTotalHits().value, equalTo((long) numDocsWithinRange));
});

// test with SearchShardsAPI
{
Expand Down Expand Up @@ -338,13 +339,14 @@ public void testSearchableSnapshotShardsAreSkippedBySearchRequestWithoutQuerying
}
}
} else {
SearchResponse newSearchResponse = client().search(request).actionGet();
// When all shards are skipped, at least one of them should be queried in order to
// provide a proper search response.
assertThat(newSearchResponse.getSkippedShards(), equalTo(indexOutsideSearchRangeShardCount - 1));
assertThat(newSearchResponse.getSuccessfulShards(), equalTo(indexOutsideSearchRangeShardCount - 1));
assertThat(newSearchResponse.getFailedShards(), equalTo(1));
assertThat(newSearchResponse.getTotalShards(), equalTo(indexOutsideSearchRangeShardCount));
assertResponse(client().search(request), newSearchResponse -> {
// When all shards are skipped, at least one of them should be queried in order to
// provide a proper search response.
assertThat(newSearchResponse.getSkippedShards(), equalTo(indexOutsideSearchRangeShardCount - 1));
assertThat(newSearchResponse.getSuccessfulShards(), equalTo(indexOutsideSearchRangeShardCount - 1));
assertThat(newSearchResponse.getFailedShards(), equalTo(1));
assertThat(newSearchResponse.getTotalShards(), equalTo(indexOutsideSearchRangeShardCount));
});

// test with SearchShardsAPI
{
Expand Down Expand Up @@ -449,14 +451,15 @@ public void testQueryPhaseIsExecutedInAnAvailableNodeWhenAllShardsCanBeSkipped()

// test with Search API
{
SearchResponse searchResponse = client().search(request).actionGet();
// All the regular index searches succeeded
assertThat(searchResponse.getSuccessfulShards(), equalTo(indexOutsideSearchRangeShardCount));
// All the searchable snapshots shard search failed
assertThat(searchResponse.getFailedShards(), equalTo(indexOutsideSearchRangeShardCount));
assertThat(searchResponse.getSkippedShards(), equalTo(searchableSnapshotShardCount));
assertThat(searchResponse.getTotalShards(), equalTo(totalShards));
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(0L));
assertResponse(client().search(request), searchResponse -> {
// All the regular index searches succeeded
assertThat(searchResponse.getSuccessfulShards(), equalTo(indexOutsideSearchRangeShardCount));
// All the searchable snapshots shard search failed
assertThat(searchResponse.getFailedShards(), equalTo(indexOutsideSearchRangeShardCount));
assertThat(searchResponse.getSkippedShards(), equalTo(searchableSnapshotShardCount));
assertThat(searchResponse.getTotalShards(), equalTo(totalShards));
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(0L));
});
}

// test with SearchShards API
Expand Down Expand Up @@ -513,16 +516,16 @@ public void testQueryPhaseIsExecutedInAnAvailableNodeWhenAllShardsCanBeSkipped()
// busy assert since computing the time stamp field from the cluster state happens off of the CS applier thread and thus can be
// slightly delayed
assertBusy(() -> {
SearchResponse newSearchResponse = client().search(request).actionGet();

// All the regular index searches succeeded
assertThat(newSearchResponse.getSuccessfulShards(), equalTo(totalShards));
assertThat(newSearchResponse.getFailedShards(), equalTo(0));
// We have to query at least one node to construct a valid response, and we pick
// a shard that's available in order to construct the search response
assertThat(newSearchResponse.getSkippedShards(), equalTo(totalShards - 1));
assertThat(newSearchResponse.getTotalShards(), equalTo(totalShards));
assertThat(newSearchResponse.getHits().getTotalHits().value, equalTo(0L));
assertResponse(client().search(request), newSearchResponse -> {
// All the regular index searches succeeded
assertThat(newSearchResponse.getSuccessfulShards(), equalTo(totalShards));
assertThat(newSearchResponse.getFailedShards(), equalTo(0));
// We have to query at least one node to construct a valid response, and we pick
// a shard that's available in order to construct the search response
assertThat(newSearchResponse.getSkippedShards(), equalTo(totalShards - 1));
assertThat(newSearchResponse.getTotalShards(), equalTo(totalShards));
assertThat(newSearchResponse.getHits().getTotalHits().value, equalTo(0L));
});
});

// test with SearchShards API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchResponseUtils;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotsService;
Expand Down Expand Up @@ -117,19 +118,12 @@ public void testCreateAndRestoreSearchableSnapshot() throws Exception {

populateIndex(indexName, 10_000);

final TotalHits originalAllHits = internalCluster().client()
.prepareSearch(indexName)
.setTrackTotalHits(true)
.get()
.getHits()
.getTotalHits();
final TotalHits originalBarHits = internalCluster().client()
.prepareSearch(indexName)
.setTrackTotalHits(true)
.setQuery(matchQuery("foo", "bar"))
.get()
.getHits()
.getTotalHits();
final TotalHits originalAllHits = SearchResponseUtils.getTotalHits(
internalCluster().client().prepareSearch(indexName).setTrackTotalHits(true)
);
final TotalHits originalBarHits = SearchResponseUtils.getTotalHits(
internalCluster().client().prepareSearch(indexName).setTrackTotalHits(true).setQuery(matchQuery("foo", "bar"))
);
logger.info("--> [{}] in total, of which [{}] match the query", originalAllHits, originalBarHits);

expectThrows(
Expand Down Expand Up @@ -765,19 +759,12 @@ public void testSnapshotOfSearchableSnapshotIncludesNoDataButCanBeRestored() thr
Settings.builder().put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1).put(INDEX_SOFT_DELETES_SETTING.getKey(), true)
);

final TotalHits originalAllHits = internalCluster().client()
.prepareSearch(indexName)
.setTrackTotalHits(true)
.get()
.getHits()
.getTotalHits();
final TotalHits originalBarHits = internalCluster().client()
.prepareSearch(indexName)
.setTrackTotalHits(true)
.setQuery(matchQuery("foo", "bar"))
.get()
.getHits()
.getTotalHits();
final TotalHits originalAllHits = SearchResponseUtils.getTotalHits(
internalCluster().client().prepareSearch(indexName).setTrackTotalHits(true)
);
final TotalHits originalBarHits = SearchResponseUtils.getTotalHits(
internalCluster().client().prepareSearch(indexName).setTrackTotalHits(true).setQuery(matchQuery("foo", "bar"))
);
logger.info("--> [{}] in total, of which [{}] match the query", originalAllHits, originalBarHits);

// The repository that contains the actual data
Expand Down Expand Up @@ -936,19 +923,12 @@ public void testSnapshotOfSearchableSnapshotCanBeRestoredBeforeRepositoryRegiste
Settings.builder().put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1).put(INDEX_SOFT_DELETES_SETTING.getKey(), true)
);

final TotalHits originalAllHits = internalCluster().client()
.prepareSearch(indexName)
.setTrackTotalHits(true)
.get()
.getHits()
.getTotalHits();
final TotalHits originalBarHits = internalCluster().client()
.prepareSearch(indexName)
.setTrackTotalHits(true)
.setQuery(matchQuery("foo", "bar"))
.get()
.getHits()
.getTotalHits();
final TotalHits originalAllHits = SearchResponseUtils.getTotalHits(
internalCluster().client().prepareSearch(indexName).setTrackTotalHits(true)
);
final TotalHits originalBarHits = SearchResponseUtils.getTotalHits(
internalCluster().client().prepareSearch(indexName).setTrackTotalHits(true).setQuery(matchQuery("foo", "bar"))
);
logger.info("--> [{}] in total, of which [{}] match the query", originalAllHits, originalBarHits);

// Take snapshot containing the actual data to one repository
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.indices.recovery.plan.ShardSnapshotsService;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.search.SearchResponseUtils;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest;

Expand Down Expand Up @@ -43,12 +44,9 @@ public void testSearchableSnapshotRelocationDoNotUseSnapshotBasedRecoveries() th
.put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
);

final TotalHits totalHits = internalCluster().client()
.prepareSearch(indexName)
.setTrackTotalHits(true)
.get()
.getHits()
.getTotalHits();
final TotalHits totalHits = SearchResponseUtils.getTotalHits(
internalCluster().client().prepareSearch(indexName).setTrackTotalHits(true)
);

final var snapshotName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createSnapshot(repositoryName, snapshotName, List.of(indexName));
Expand Down
Loading