diff --git a/test/framework/src/main/java/org/elasticsearch/search/SearchResponseUtils.java b/test/framework/src/main/java/org/elasticsearch/search/SearchResponseUtils.java index e61b89fcff42c..589bc76c55a3d 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/SearchResponseUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/search/SearchResponseUtils.java @@ -7,17 +7,22 @@ */ package org.elasticsearch.search; +import org.apache.lucene.search.TotalHits; import org.elasticsearch.action.search.SearchRequestBuilder; public enum SearchResponseUtils { ; - public static long getTotalHitsValue(SearchRequestBuilder request) { + public static TotalHits getTotalHits(SearchRequestBuilder request) { var resp = request.get(); try { - return resp.getHits().getTotalHits().value; + return resp.getHits().getTotalHits(); } finally { resp.decRef(); } } + + public static long getTotalHitsValue(SearchRequestBuilder request) { + return getTotalHits(request).value; + } } diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java index d3bb435dc03ab..b7dc212fe12ad 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/BaseSearchableSnapshotsIntegTestCase.java @@ -64,6 +64,7 @@ import static org.elasticsearch.license.LicenseSettings.SELF_GENERATED_LICENSE_TYPE; import static org.elasticsearch.test.NodeRoles.addRoles; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse; import static org.elasticsearch.xpack.searchablesnapshots.cache.common.TestUtils.pageAligned; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -289,10 +290,10 @@ protected void assertTotalHits(String indexName, TotalHits originalAllHits, Tota } catch (InterruptedException e) { throw new RuntimeException(e); } - allHits.set(t, prepareSearch(indexName).setTrackTotalHits(true).get().getHits().getTotalHits()); - barHits.set( - t, - prepareSearch(indexName).setTrackTotalHits(true).setQuery(matchQuery("foo", "bar")).get().getHits().getTotalHits() + assertResponse(prepareSearch(indexName).setTrackTotalHits(true), resp -> allHits.set(t, resp.getHits().getTotalHits())); + assertResponse( + prepareSearch(indexName).setTrackTotalHits(true).setQuery(matchQuery("foo", "bar")), + resp -> barHits.set(t, resp.getHits().getTotalHits()) ); }); threads[i].start(); diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java index 0551ac3007f10..c80cf3c3d62e3 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java @@ -9,7 +9,6 @@ import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.ClosePointInTimeRequest; import org.elasticsearch.action.search.OpenPointInTimeRequest; -import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.search.TransportClosePointInTimeAction; import org.elasticsearch.action.search.TransportOpenPointInTimeAction; @@ -32,7 +31,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailuresAndResponse; import static org.hamcrest.Matchers.equalTo; public class RetrySearchIntegTests extends BaseSearchableSnapshotsIntegTestCase { @@ -144,30 +143,31 @@ public void testRetryPointInTime() throws Exception { ).keepAlive(TimeValue.timeValueMinutes(2)); final String pitId = client().execute(TransportOpenPointInTimeAction.TYPE, openRequest).actionGet().getPointInTimeId(); try { - SearchResponse resp = prepareSearch().setIndices(indexName) - .setPreference(null) - .setPointInTime(new PointInTimeBuilder(pitId)) - .get(); - assertNoFailures(resp); - assertThat(resp.pointInTimeId(), equalTo(pitId)); - assertHitCount(resp, docCount); - + assertNoFailuresAndResponse( + prepareSearch().setIndices(indexName).setPreference(null).setPointInTime(new PointInTimeBuilder(pitId)), + resp -> { + assertThat(resp.pointInTimeId(), equalTo(pitId)); + assertHitCount(resp, docCount); + } + ); final Set allocatedNodes = internalCluster().nodesInclude(indexName); for (String allocatedNode : allocatedNodes) { internalCluster().restartNode(allocatedNode); } ensureGreen(indexName); - resp = prepareSearch().setIndices(indexName) - .setQuery(new RangeQueryBuilder("created_date").gte("2011-01-01").lte("2011-12-12")) - .setSearchType(SearchType.QUERY_THEN_FETCH) - .setPreference(null) - .setPreFilterShardSize(between(1, 10)) - .setAllowPartialSearchResults(true) - .setPointInTime(new PointInTimeBuilder(pitId)) - .get(); - assertNoFailures(resp); - assertThat(resp.pointInTimeId(), equalTo(pitId)); - assertHitCount(resp, docCount); + assertNoFailuresAndResponse( + prepareSearch().setIndices(indexName) + .setQuery(new RangeQueryBuilder("created_date").gte("2011-01-01").lte("2011-12-12")) + .setSearchType(SearchType.QUERY_THEN_FETCH) + .setPreference(null) + .setPreFilterShardSize(between(1, 10)) + .setAllowPartialSearchResults(true) + .setPointInTime(new PointInTimeBuilder(pitId)), + resp -> { + assertThat(resp.pointInTimeId(), equalTo(pitId)); + assertHitCount(resp, docCount); + } + ); } finally { client().execute(TransportClosePointInTimeAction.TYPE, new ClosePointInTimeRequest(pitId)).actionGet(); } diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsCanMatchOnCoordinatorIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsCanMatchOnCoordinatorIntegTests.java index 844e6099460b2..a7a3b8e461604 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsCanMatchOnCoordinatorIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsCanMatchOnCoordinatorIntegTests.java @@ -52,6 +52,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING; import static org.elasticsearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -182,14 +183,14 @@ public void testSearchableSnapshotShardsAreSkippedBySearchRequestWithoutQuerying .source(new SearchSourceBuilder().query(rangeQuery)); if (includeIndexCoveringSearchRangeInSearchRequest) { - SearchResponse searchResponse = client().search(request).actionGet(); - - // All the regular index searches succeeded - assertThat(searchResponse.getSuccessfulShards(), equalTo(indexWithinSearchRangeShardCount)); - // All the searchable snapshots shard search failed - assertThat(searchResponse.getFailedShards(), equalTo(indexOutsideSearchRangeShardCount)); - assertThat(searchResponse.getSkippedShards(), equalTo(0)); - assertThat(searchResponse.getTotalShards(), equalTo(totalShards)); + assertResponse(client().search(request), searchResponse -> { + // All the regular index searches succeeded + assertThat(searchResponse.getSuccessfulShards(), equalTo(indexWithinSearchRangeShardCount)); + // All the searchable snapshots shard search failed + assertThat(searchResponse.getFailedShards(), equalTo(indexOutsideSearchRangeShardCount)); + assertThat(searchResponse.getSkippedShards(), equalTo(0)); + assertThat(searchResponse.getTotalShards(), equalTo(totalShards)); + }); } else { // All shards failed, since all shards are unassigned and the IndexMetadata min/max timestamp // is not available yet @@ -271,13 +272,13 @@ public void testSearchableSnapshotShardsAreSkippedBySearchRequestWithoutQuerying waitUntilAllShardsAreUnassigned(updatedIndexMetadata.getIndex()); if (includeIndexCoveringSearchRangeInSearchRequest) { - SearchResponse newSearchResponse = client().search(request).actionGet(); - - assertThat(newSearchResponse.getSkippedShards(), equalTo(indexOutsideSearchRangeShardCount)); - assertThat(newSearchResponse.getSuccessfulShards(), equalTo(totalShards)); - assertThat(newSearchResponse.getFailedShards(), equalTo(0)); - assertThat(newSearchResponse.getTotalShards(), equalTo(totalShards)); - assertThat(newSearchResponse.getHits().getTotalHits().value, equalTo((long) numDocsWithinRange)); + assertResponse(client().search(request), newSearchResponse -> { + assertThat(newSearchResponse.getSkippedShards(), equalTo(indexOutsideSearchRangeShardCount)); + assertThat(newSearchResponse.getSuccessfulShards(), equalTo(totalShards)); + assertThat(newSearchResponse.getFailedShards(), equalTo(0)); + assertThat(newSearchResponse.getTotalShards(), equalTo(totalShards)); + assertThat(newSearchResponse.getHits().getTotalHits().value, equalTo((long) numDocsWithinRange)); + }); // test with SearchShardsAPI { @@ -338,13 +339,14 @@ public void testSearchableSnapshotShardsAreSkippedBySearchRequestWithoutQuerying } } } else { - SearchResponse newSearchResponse = client().search(request).actionGet(); - // When all shards are skipped, at least one of them should be queried in order to - // provide a proper search response. - assertThat(newSearchResponse.getSkippedShards(), equalTo(indexOutsideSearchRangeShardCount - 1)); - assertThat(newSearchResponse.getSuccessfulShards(), equalTo(indexOutsideSearchRangeShardCount - 1)); - assertThat(newSearchResponse.getFailedShards(), equalTo(1)); - assertThat(newSearchResponse.getTotalShards(), equalTo(indexOutsideSearchRangeShardCount)); + assertResponse(client().search(request), newSearchResponse -> { + // When all shards are skipped, at least one of them should be queried in order to + // provide a proper search response. + assertThat(newSearchResponse.getSkippedShards(), equalTo(indexOutsideSearchRangeShardCount - 1)); + assertThat(newSearchResponse.getSuccessfulShards(), equalTo(indexOutsideSearchRangeShardCount - 1)); + assertThat(newSearchResponse.getFailedShards(), equalTo(1)); + assertThat(newSearchResponse.getTotalShards(), equalTo(indexOutsideSearchRangeShardCount)); + }); // test with SearchShardsAPI { @@ -449,14 +451,15 @@ public void testQueryPhaseIsExecutedInAnAvailableNodeWhenAllShardsCanBeSkipped() // test with Search API { - SearchResponse searchResponse = client().search(request).actionGet(); - // All the regular index searches succeeded - assertThat(searchResponse.getSuccessfulShards(), equalTo(indexOutsideSearchRangeShardCount)); - // All the searchable snapshots shard search failed - assertThat(searchResponse.getFailedShards(), equalTo(indexOutsideSearchRangeShardCount)); - assertThat(searchResponse.getSkippedShards(), equalTo(searchableSnapshotShardCount)); - assertThat(searchResponse.getTotalShards(), equalTo(totalShards)); - assertThat(searchResponse.getHits().getTotalHits().value, equalTo(0L)); + assertResponse(client().search(request), searchResponse -> { + // All the regular index searches succeeded + assertThat(searchResponse.getSuccessfulShards(), equalTo(indexOutsideSearchRangeShardCount)); + // All the searchable snapshots shard search failed + assertThat(searchResponse.getFailedShards(), equalTo(indexOutsideSearchRangeShardCount)); + assertThat(searchResponse.getSkippedShards(), equalTo(searchableSnapshotShardCount)); + assertThat(searchResponse.getTotalShards(), equalTo(totalShards)); + assertThat(searchResponse.getHits().getTotalHits().value, equalTo(0L)); + }); } // test with SearchShards API @@ -513,16 +516,16 @@ public void testQueryPhaseIsExecutedInAnAvailableNodeWhenAllShardsCanBeSkipped() // busy assert since computing the time stamp field from the cluster state happens off of the CS applier thread and thus can be // slightly delayed assertBusy(() -> { - SearchResponse newSearchResponse = client().search(request).actionGet(); - - // All the regular index searches succeeded - assertThat(newSearchResponse.getSuccessfulShards(), equalTo(totalShards)); - assertThat(newSearchResponse.getFailedShards(), equalTo(0)); - // We have to query at least one node to construct a valid response, and we pick - // a shard that's available in order to construct the search response - assertThat(newSearchResponse.getSkippedShards(), equalTo(totalShards - 1)); - assertThat(newSearchResponse.getTotalShards(), equalTo(totalShards)); - assertThat(newSearchResponse.getHits().getTotalHits().value, equalTo(0L)); + assertResponse(client().search(request), newSearchResponse -> { + // All the regular index searches succeeded + assertThat(newSearchResponse.getSuccessfulShards(), equalTo(totalShards)); + assertThat(newSearchResponse.getFailedShards(), equalTo(0)); + // We have to query at least one node to construct a valid response, and we pick + // a shard that's available in order to construct the search response + assertThat(newSearchResponse.getSkippedShards(), equalTo(totalShards - 1)); + assertThat(newSearchResponse.getTotalShards(), equalTo(totalShards)); + assertThat(newSearchResponse.getHits().getTotalHits().value, equalTo(0L)); + }); }); // test with SearchShards API diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java index c3f5e44ae32a0..876ff9ebdb86f 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java @@ -46,6 +46,7 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.SearchResponseUtils; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotsService; @@ -117,19 +118,12 @@ public void testCreateAndRestoreSearchableSnapshot() throws Exception { populateIndex(indexName, 10_000); - final TotalHits originalAllHits = internalCluster().client() - .prepareSearch(indexName) - .setTrackTotalHits(true) - .get() - .getHits() - .getTotalHits(); - final TotalHits originalBarHits = internalCluster().client() - .prepareSearch(indexName) - .setTrackTotalHits(true) - .setQuery(matchQuery("foo", "bar")) - .get() - .getHits() - .getTotalHits(); + final TotalHits originalAllHits = SearchResponseUtils.getTotalHits( + internalCluster().client().prepareSearch(indexName).setTrackTotalHits(true) + ); + final TotalHits originalBarHits = SearchResponseUtils.getTotalHits( + internalCluster().client().prepareSearch(indexName).setTrackTotalHits(true).setQuery(matchQuery("foo", "bar")) + ); logger.info("--> [{}] in total, of which [{}] match the query", originalAllHits, originalBarHits); expectThrows( @@ -765,19 +759,12 @@ public void testSnapshotOfSearchableSnapshotIncludesNoDataButCanBeRestored() thr Settings.builder().put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1).put(INDEX_SOFT_DELETES_SETTING.getKey(), true) ); - final TotalHits originalAllHits = internalCluster().client() - .prepareSearch(indexName) - .setTrackTotalHits(true) - .get() - .getHits() - .getTotalHits(); - final TotalHits originalBarHits = internalCluster().client() - .prepareSearch(indexName) - .setTrackTotalHits(true) - .setQuery(matchQuery("foo", "bar")) - .get() - .getHits() - .getTotalHits(); + final TotalHits originalAllHits = SearchResponseUtils.getTotalHits( + internalCluster().client().prepareSearch(indexName).setTrackTotalHits(true) + ); + final TotalHits originalBarHits = SearchResponseUtils.getTotalHits( + internalCluster().client().prepareSearch(indexName).setTrackTotalHits(true).setQuery(matchQuery("foo", "bar")) + ); logger.info("--> [{}] in total, of which [{}] match the query", originalAllHits, originalBarHits); // The repository that contains the actual data @@ -936,19 +923,12 @@ public void testSnapshotOfSearchableSnapshotCanBeRestoredBeforeRepositoryRegiste Settings.builder().put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1).put(INDEX_SOFT_DELETES_SETTING.getKey(), true) ); - final TotalHits originalAllHits = internalCluster().client() - .prepareSearch(indexName) - .setTrackTotalHits(true) - .get() - .getHits() - .getTotalHits(); - final TotalHits originalBarHits = internalCluster().client() - .prepareSearch(indexName) - .setTrackTotalHits(true) - .setQuery(matchQuery("foo", "bar")) - .get() - .getHits() - .getTotalHits(); + final TotalHits originalAllHits = SearchResponseUtils.getTotalHits( + internalCluster().client().prepareSearch(indexName).setTrackTotalHits(true) + ); + final TotalHits originalBarHits = SearchResponseUtils.getTotalHits( + internalCluster().client().prepareSearch(indexName).setTrackTotalHits(true).setQuery(matchQuery("foo", "bar")) + ); logger.info("--> [{}] in total, of which [{}] match the query", originalAllHits, originalBarHits); // Take snapshot containing the actual data to one repository diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsRecoverFromSnapshotIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsRecoverFromSnapshotIntegTests.java index 6f71f7c33bf06..894d3af8d75b8 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsRecoverFromSnapshotIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsRecoverFromSnapshotIntegTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.indices.recovery.plan.ShardSnapshotsService; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.repositories.fs.FsRepository; +import org.elasticsearch.search.SearchResponseUtils; import org.elasticsearch.test.MockLogAppender; import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest; @@ -43,12 +44,9 @@ public void testSearchableSnapshotRelocationDoNotUseSnapshotBasedRecoveries() th .put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) ); - final TotalHits totalHits = internalCluster().client() - .prepareSearch(indexName) - .setTrackTotalHits(true) - .get() - .getHits() - .getTotalHits(); + final TotalHits totalHits = SearchResponseUtils.getTotalHits( + internalCluster().client().prepareSearch(indexName).setTrackTotalHits(true) + ); final var snapshotName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); createSnapshot(repositoryName, snapshotName, List.of(indexName)); diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsRepositoryIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsRepositoryIntegTests.java index cb6cf45b641c6..f97151f9ae330 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsRepositoryIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsRepositoryIntegTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.core.Nullable; import org.elasticsearch.repositories.RepositoryConflictException; import org.elasticsearch.repositories.fs.FsRepository; +import org.elasticsearch.search.SearchResponseUtils; import org.elasticsearch.snapshots.SnapshotRestoreException; import java.util.Arrays; @@ -49,12 +50,9 @@ public void testRepositoryUsedBySearchableSnapshotCanBeUpdatedButNotUnregistered Settings.builder().put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1).put(INDEX_SOFT_DELETES_SETTING.getKey(), true) ); - final TotalHits totalHits = internalCluster().client() - .prepareSearch(indexName) - .setTrackTotalHits(true) - .get() - .getHits() - .getTotalHits(); + final TotalHits totalHits = SearchResponseUtils.getTotalHits( + internalCluster().client().prepareSearch(indexName).setTrackTotalHits(true) + ); final String snapshotName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); createSnapshot(repositoryName, snapshotName, List.of(indexName)); @@ -164,7 +162,9 @@ public void testMountIndexWithDifferentDeletionOfSnapshot() throws Exception { final String index = "index"; createAndPopulateIndex(index, Settings.builder().put(INDEX_SOFT_DELETES_SETTING.getKey(), true)); - final TotalHits totalHits = internalCluster().client().prepareSearch(index).setTrackTotalHits(true).get().getHits().getTotalHits(); + final TotalHits totalHits = SearchResponseUtils.getTotalHits( + internalCluster().client().prepareSearch(index).setTrackTotalHits(true) + ); final String snapshot = "snapshot"; createSnapshot(repository, snapshot, List.of(index)); @@ -220,7 +220,9 @@ public void testDeletionOfSnapshotSettingCannotBeUpdated() throws Exception { final String index = "index"; createAndPopulateIndex(index, Settings.builder().put(INDEX_SOFT_DELETES_SETTING.getKey(), true)); - final TotalHits totalHits = internalCluster().client().prepareSearch(index).setTrackTotalHits(true).get().getHits().getTotalHits(); + final TotalHits totalHits = SearchResponseUtils.getTotalHits( + internalCluster().client().prepareSearch(index).setTrackTotalHits(true) + ); final String snapshot = "snapshot"; createSnapshot(repository, snapshot, List.of(index)); diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheIntegTests.java index b5ebf1104a195..37b3ecfd36959 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheIntegTests.java @@ -37,6 +37,7 @@ import org.elasticsearch.plugins.ClusterPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.reindex.ReindexPlugin; +import org.elasticsearch.search.SearchResponseUtils; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.xcontent.XContentBuilder; @@ -207,11 +208,9 @@ public void testBlobStoreCache() throws Exception { refreshSystemIndex(); - final long numberOfCachedBlobs = systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX) - .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) - .get() - .getHits() - .getTotalHits().value; + final long numberOfCachedBlobs = SearchResponseUtils.getTotalHitsValue( + systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) + ); IndexingStats indexingStats = systemClient().admin() .indices() diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java index 04233e47b7bcc..981ffe2832e66 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/SearchableSnapshotsBlobStoreCacheMaintenanceIntegTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.reindex.ReindexPlugin; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.fs.FsRepository; +import org.elasticsearch.search.SearchResponseUtils; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentFactory; @@ -318,16 +319,12 @@ private Client systemClient() { } private long numberOfEntriesInCache() { - var res = systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX) - .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) - .setTrackTotalHits(true) - .setSize(0) - .get(); - try { - return res.getHits().getTotalHits().value; - } finally { - res.decRef(); - } + return SearchResponseUtils.getTotalHitsValue( + systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX) + .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) + .setTrackTotalHits(true) + .setSize(0) + ); } private void refreshSystemIndex(boolean failIfNotExist) { diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/shared/NodesCachesStatsIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/shared/NodesCachesStatsIntegTests.java index 3858b087f4d3a..42ac63579b6c6 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/shared/NodesCachesStatsIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/shared/NodesCachesStatsIntegTests.java @@ -112,7 +112,7 @@ public void testNodesCachesStats() throws Exception { randomBoolean() ? QueryBuilders.rangeQuery("id").gte(randomIntBetween(0, 1000)) : QueryBuilders.termQuery("test", "value" + randomIntBetween(0, 1000)) - ).setSize(randomIntBetween(0, 1000)).get(); + ).setSize(randomIntBetween(0, 1000)).get().decRef(); } assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java index 64508e1d49959..89cab65765bf9 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/blob/BlobStoreCacheMaintenanceService.java @@ -417,7 +417,7 @@ private class PeriodicMaintenanceTask implements Runnable, Releasable { private volatile Map> existingSnapshots; private volatile Set existingRepositories; - private volatile SearchResponse searchResponse; + private final AtomicReference searchResponse = new AtomicReference<>(); private volatile Instant expirationTime; private volatile String pointIntTimeId; private volatile Object[] searchAfter; @@ -458,146 +458,155 @@ public void onFailure(Exception e) { final String pitId = pointIntTimeId; assert Strings.hasLength(pitId); - if (searchResponse == null) { - final SearchSourceBuilder searchSource = new SearchSourceBuilder(); - searchSource.fetchField(new FieldAndFormat(CachedBlob.CREATION_TIME_FIELD, "epoch_millis")); - searchSource.fetchSource(false); - searchSource.trackScores(false); - searchSource.sort(ShardDocSortField.NAME); - searchSource.size(batchSize); - if (searchAfter != null) { - searchSource.searchAfter(searchAfter); - searchSource.trackTotalHits(false); - } else { - searchSource.trackTotalHits(true); + SearchResponse searchResponseRef; + do { + searchResponseRef = searchResponse.get(); + if (searchResponseRef == null) { + handleMissingSearchResponse(pitId); + return; } - final PointInTimeBuilder pointInTime = new PointInTimeBuilder(pitId); - pointInTime.setKeepAlive(keepAlive); - searchSource.pointInTimeBuilder(pointInTime); - final SearchRequest searchRequest = new SearchRequest(); - searchRequest.source(searchSource); - clientWithOrigin.execute(TransportSearchAction.TYPE, searchRequest, new ActionListener<>() { - @Override - public void onResponse(SearchResponse response) { - if (searchAfter == null) { - assert PeriodicMaintenanceTask.this.total.get() == 0L; - PeriodicMaintenanceTask.this.total.set(response.getHits().getTotalHits().value); - } - PeriodicMaintenanceTask.this.searchResponse = response; - PeriodicMaintenanceTask.this.searchAfter = null; - executeNext(PeriodicMaintenanceTask.this); - } - - @Override - public void onFailure(Exception e) { - complete(e); - } - }); - return; + } while (searchResponseRef.tryIncRef() == false); + try { + var searchHits = searchResponseRef.getHits().getHits(); + if (searchHits != null && searchHits.length > 0) { + updateWithSearchHits(searchHits); + return; + } + } finally { + searchResponseRef.decRef(); } + // we're done, complete the task + complete(null); + } catch (Exception e) { + complete(e); + } + } - final SearchHit[] searchHits = searchResponse.getHits().getHits(); - if (searchHits != null && searchHits.length > 0) { - if (expirationTime == null) { - final TimeValue retention = periodicTaskRetention; - expirationTime = Instant.ofEpochMilli(threadPool.absoluteTimeInMillis()) - .minus(retention.duration(), retention.timeUnit().toChronoUnit()); - - final ClusterState state = clusterService.state(); - // compute the list of existing searchable snapshots and repositories once - existingSnapshots = listSearchableSnapshots(state); - existingRepositories = RepositoriesMetadata.get(state) - .repositories() - .stream() - .map(RepositoryMetadata::name) - .collect(Collectors.toSet()); + private void handleMissingSearchResponse(String pitId) { + final SearchSourceBuilder searchSource = new SearchSourceBuilder(); + searchSource.fetchField(new FieldAndFormat(CachedBlob.CREATION_TIME_FIELD, "epoch_millis")); + searchSource.fetchSource(false); + searchSource.trackScores(false); + searchSource.sort(ShardDocSortField.NAME); + searchSource.size(batchSize); + if (searchAfter != null) { + searchSource.searchAfter(searchAfter); + searchSource.trackTotalHits(false); + } else { + searchSource.trackTotalHits(true); + } + final PointInTimeBuilder pointInTime = new PointInTimeBuilder(pitId); + pointInTime.setKeepAlive(keepAlive); + searchSource.pointInTimeBuilder(pointInTime); + final SearchRequest searchRequest = new SearchRequest(); + searchRequest.source(searchSource); + clientWithOrigin.execute(TransportSearchAction.TYPE, searchRequest, new ActionListener<>() { + @Override + public void onResponse(SearchResponse response) { + if (searchAfter == null) { + assert PeriodicMaintenanceTask.this.total.get() == 0L; + PeriodicMaintenanceTask.this.total.set(response.getHits().getTotalHits().value); } + PeriodicMaintenanceTask.this.setCurrentResponse(response); + PeriodicMaintenanceTask.this.searchAfter = null; + executeNext(PeriodicMaintenanceTask.this); + } - final BulkRequest bulkRequest = new BulkRequest(); - final Map> knownSnapshots = existingSnapshots; - assert knownSnapshots != null; - final Set knownRepositories = existingRepositories; - assert knownRepositories != null; - final Instant expirationTimeCopy = this.expirationTime; - assert expirationTimeCopy != null; - - Object[] lastSortValues = null; - for (SearchHit searchHit : searchHits) { - lastSortValues = searchHit.getSortValues(); - assert searchHit.getId() != null; - try { - boolean delete = false; - - // See {@link BlobStoreCacheService#generateId} - // doc id = {repository name}/{snapshot id}/{snapshot index id}/{shard id}/{file name}/@{file offset} - final String[] parts = Objects.requireNonNull(searchHit.getId()).split("/"); - assert parts.length == 6 : Arrays.toString(parts) + " vs " + searchHit.getId(); - - final String repositoryName = parts[0]; - if (knownRepositories.contains(repositoryName) == false) { - logger.trace("deleting blob store cache entry with id [{}]: repository does not exist", searchHit.getId()); - delete = true; - } else { - final Set knownIndexIds = knownSnapshots.get(parts[1]); - if (knownIndexIds == null || knownIndexIds.contains(parts[2]) == false) { - logger.trace("deleting blob store cache entry with id [{}]: not used", searchHit.getId()); - delete = true; - } - } - if (delete) { - final Instant creationTime = getCreationTime(searchHit); - if (creationTime.isAfter(expirationTimeCopy)) { - logger.trace( - "blob store cache entry with id [{}] was created recently, skipping deletion", - searchHit.getId() - ); - continue; - } - bulkRequest.add(new DeleteRequest().index(searchHit.getIndex()).id(searchHit.getId())); - } - } catch (Exception e) { - logger.warn( - () -> format("exception when parsing blob store cache entry with id [%s], skipping", searchHit.getId()), - e - ); + @Override + public void onFailure(Exception e) { + complete(e); + } + }); + } + + private void updateWithSearchHits(SearchHit[] searchHits) { + if (expirationTime == null) { + final TimeValue retention = periodicTaskRetention; + expirationTime = Instant.ofEpochMilli(threadPool.absoluteTimeInMillis()) + .minus(retention.duration(), retention.timeUnit().toChronoUnit()); + + final ClusterState state = clusterService.state(); + // compute the list of existing searchable snapshots and repositories once + existingSnapshots = listSearchableSnapshots(state); + existingRepositories = RepositoriesMetadata.get(state) + .repositories() + .stream() + .map(RepositoryMetadata::name) + .collect(Collectors.toSet()); + } + + final BulkRequest bulkRequest = new BulkRequest(); + final Map> knownSnapshots = existingSnapshots; + assert knownSnapshots != null; + final Set knownRepositories = existingRepositories; + assert knownRepositories != null; + final Instant expirationTimeCopy = this.expirationTime; + assert expirationTimeCopy != null; + + Object[] lastSortValues = null; + for (SearchHit searchHit : searchHits) { + lastSortValues = searchHit.getSortValues(); + assert searchHit.getId() != null; + try { + boolean delete = false; + + // See {@link BlobStoreCacheService#generateId} + // doc id = {repository name}/{snapshot id}/{snapshot index id}/{shard id}/{file name}/@{file offset} + final String[] parts = Objects.requireNonNull(searchHit.getId()).split("/"); + assert parts.length == 6 : Arrays.toString(parts) + " vs " + searchHit.getId(); + + final String repositoryName = parts[0]; + if (knownRepositories.contains(repositoryName) == false) { + logger.trace("deleting blob store cache entry with id [{}]: repository does not exist", searchHit.getId()); + delete = true; + } else { + final Set knownIndexIds = knownSnapshots.get(parts[1]); + if (knownIndexIds == null || knownIndexIds.contains(parts[2]) == false) { + logger.trace("deleting blob store cache entry with id [{}]: not used", searchHit.getId()); + delete = true; } } - - assert lastSortValues != null; - if (bulkRequest.numberOfActions() == 0) { - this.searchResponse = null; - this.searchAfter = lastSortValues; - executeNext(this); - return; + if (delete) { + final Instant creationTime = getCreationTime(searchHit); + if (creationTime.isAfter(expirationTimeCopy)) { + logger.trace("blob store cache entry with id [{}] was created recently, skipping deletion", searchHit.getId()); + continue; + } + bulkRequest.add(new DeleteRequest().index(searchHit.getIndex()).id(searchHit.getId())); } + } catch (Exception e) { + logger.warn(() -> format("exception when parsing blob store cache entry with id [%s], skipping", searchHit.getId()), e); + } + } - final Object[] finalSearchAfter = lastSortValues; - clientWithOrigin.execute(BulkAction.INSTANCE, bulkRequest, new ActionListener<>() { - @Override - public void onResponse(BulkResponse response) { - for (BulkItemResponse itemResponse : response.getItems()) { - if (itemResponse.isFailed() == false) { - assert itemResponse.getResponse() instanceof DeleteResponse; - PeriodicMaintenanceTask.this.deletes.incrementAndGet(); - } - } - PeriodicMaintenanceTask.this.searchResponse = null; - PeriodicMaintenanceTask.this.searchAfter = finalSearchAfter; - executeNext(PeriodicMaintenanceTask.this); - } + assert lastSortValues != null; + if (bulkRequest.numberOfActions() == 0) { + setCurrentResponse(null); + this.searchAfter = lastSortValues; + executeNext(this); + return; + } - @Override - public void onFailure(Exception e) { - complete(e); + final Object[] finalSearchAfter = lastSortValues; + clientWithOrigin.execute(BulkAction.INSTANCE, bulkRequest, new ActionListener<>() { + @Override + public void onResponse(BulkResponse response) { + for (BulkItemResponse itemResponse : response.getItems()) { + if (itemResponse.isFailed() == false) { + assert itemResponse.getResponse() instanceof DeleteResponse; + deletes.incrementAndGet(); } - }); - return; + } + PeriodicMaintenanceTask.this.setCurrentResponse(null); + PeriodicMaintenanceTask.this.searchAfter = finalSearchAfter; + executeNext(PeriodicMaintenanceTask.this); } - // we're done, complete the task - complete(null); - } catch (Exception e) { - complete(e); - } + + @Override + public void onFailure(Exception e) { + complete(e); + } + }); } public boolean isClosed() { @@ -614,6 +623,7 @@ private void ensureOpen() { @Override public void close() { if (closed.compareAndSet(false, true)) { + setCurrentResponse(null); final Exception e = error.get(); if (e != null) { logger.warn( @@ -679,6 +689,16 @@ public void onFailure(Exception e) { } } } + + private void setCurrentResponse(SearchResponse response) { + if (response != null) { + response.mustIncRef(); + } + var previous = searchResponse.getAndSet(response); + if (previous != null) { + previous.decRef(); + } + } } private void executeNext(PeriodicMaintenanceTask maintenanceTask) {