Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix SearchResponse leaks in REST and Enrich tests #103533

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@

import org.apache.lucene.search.TotalHits;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Response;
import org.elasticsearch.test.rest.ESRestTestCase;

import java.io.IOException;

public enum SearchResponseUtils {
;
Expand All @@ -25,4 +30,10 @@ public static TotalHits getTotalHits(SearchRequestBuilder request) {
public static long getTotalHitsValue(SearchRequestBuilder request) {
return getTotalHits(request).value;
}

public static SearchResponse responseAsSearchResponse(Response searchResponse) throws IOException {
try (var parser = ESRestTestCase.responseAsParser(searchResponse)) {
return SearchResponse.fromXContent(parser);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1824,7 +1824,7 @@ protected static Map<String, Object> responseAsMap(Response response) throws IOE
return responseEntity;
}

protected static XContentParser responseAsParser(Response response) throws IOException {
public static XContentParser responseAsParser(Response response) throws IOException {
return XContentHelper.createParser(XContentParserConfiguration.EMPTY, responseAsBytes(response), XContentType.JSON);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,11 @@ protected void ensureTaskNotRunning(String id) throws Exception {
assertBusy(() -> {
try {
AsyncSearchResponse resp = getAsyncSearch(id);
assertFalse(resp.isRunning());
try {
assertFalse(resp.isRunning());
} finally {
resp.decRef();
}
} catch (Exception exc) {
if (ExceptionsHelper.unwrapCause(exc.getCause()) instanceof ResourceNotFoundException == false) {
throw exc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,13 @@ public void testAutoCreateIndex() throws Exception {
// To begin with, the results index should be auto-created.
AsyncExecutionId id = new AsyncExecutionId("0", new TaskId("N/A", 0));
AsyncSearchResponse resp = new AsyncSearchResponse(id.getEncoded(), true, true, 0L, 0L);
{
try {
PlainActionFuture<DocWriteResponse> future = new PlainActionFuture<>();
indexService.createResponse(id.getDocId(), Collections.emptyMap(), resp, future);
future.get();
assertSettings();
} finally {
resp.decRef();
}

// Delete the index, so we can test subsequent auto-create behaviour
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,26 +257,28 @@ protected <Request extends ActionRequest, Response extends ActionResponse> void
ActionListener<Response> listener
) {
assert EnrichCoordinatorProxyAction.NAME.equals(action.name());
var emptyResponse = new SearchResponse(
new InternalSearchResponse(
new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), 0.0f),
InternalAggregations.EMPTY,
new Suggest(Collections.emptyList()),
new SearchProfileResults(Collections.emptyMap()),
false,
false,
1
),
"",
1,
1,
0,
0,
ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY
);
requestCounter[0]++;
listener.onResponse((Response) emptyResponse);
ActionListener.respondAndRelease(
listener,
(Response) new SearchResponse(
new InternalSearchResponse(
new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), 0.0f),
InternalAggregations.EMPTY,
new Suggest(Collections.emptyList()),
new SearchProfileResults(Collections.emptyMap()),
false,
false,
1
),
"",
1,
1,
0,
0,
ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY
)
);
}
};
EnrichProcessorFactory factory = new EnrichProcessorFactory(client, scriptService, enrichCache);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -153,7 +154,7 @@ public void testWriteThreadLivenessBackToBack() throws Exception {
assertThat(firstFailure.getMessage(), containsString("Could not perform enrichment, enrich coordination queue at capacity"));

client().admin().indices().refresh(new RefreshRequest(enrichedIndexName)).actionGet();
assertEquals(successfulItems, client().search(new SearchRequest(enrichedIndexName)).actionGet().getHits().getTotalHits().value);
assertHitCount(client().search(new SearchRequest(enrichedIndexName)), successfulItems);
}

public void testWriteThreadLivenessWithPipeline() throws Exception {
Expand Down Expand Up @@ -276,6 +277,6 @@ public void testWriteThreadLivenessWithPipeline() throws Exception {
assertThat(firstFailure.getMessage(), containsString("Could not perform enrichment, enrich coordination queue at capacity"));

client().admin().indices().refresh(new RefreshRequest(enrichedIndexName)).actionGet();
assertEquals(successfulItems, client().search(new SearchRequest(enrichedIndexName)).actionGet().getHits().getTotalHits().value);
assertHitCount(client().search(new SearchRequest(enrichedIndexName)), successfulItems);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,7 @@ protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse>
}

try {
SearchResponse response = searchFunction.apply(buildSearchRequest());
nextPhase.onResponse(response);
ActionListener.respondAndRelease(nextPhase, searchFunction.apply(buildSearchRequest()));
} catch (Exception e) {
nextPhase.onFailure(e);
}
Expand Down Expand Up @@ -482,8 +481,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
null,
1
);
final SearchResponse response = new SearchResponse(sections, null, 1, 1, 0, 0, ShardSearchFailure.EMPTY_ARRAY, null);
nextPhase.onResponse(response);
ActionListener.respondAndRelease(
nextPhase,
new SearchResponse(sections, null, 1, 1, 0, 0, ShardSearchFailure.EMPTY_ARRAY, null)
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchResponseUtils;
import org.elasticsearch.test.rest.ObjectPath;

import java.io.IOException;
Expand Down Expand Up @@ -170,14 +171,19 @@ protected void assertSearchResponseContainsExpectedIndicesAndFields(
String[] expectedRemoteIndices,
String[] expectedFields
) {
try (var parser = responseAsParser(searchResponse)) {
try {
assertOK(searchResponse);
final var searchResult = Arrays.stream(SearchResponse.fromXContent(parser).getHits().getHits())
.collect(Collectors.toMap(SearchHit::getIndex, SearchHit::getSourceAsMap));
var response = SearchResponseUtils.responseAsSearchResponse(searchResponse);
try {
final var searchResult = Arrays.stream(response.getHits().getHits())
.collect(Collectors.toMap(SearchHit::getIndex, SearchHit::getSourceAsMap));

assertThat(searchResult.keySet(), containsInAnyOrder(expectedRemoteIndices));
for (String remoteIndex : expectedRemoteIndices) {
assertThat(searchResult.get(remoteIndex).keySet(), containsInAnyOrder(expectedFields));
assertThat(searchResult.keySet(), containsInAnyOrder(expectedRemoteIndices));
for (String remoteIndex : expectedRemoteIndices) {
assertThat(searchResult.get(remoteIndex).keySet(), containsInAnyOrder(expectedFields));
}
} finally {
response.decRef();
}
} catch (IOException e) {
throw new UncheckedIOException(e);
Expand All @@ -193,25 +199,30 @@ protected void assertSearchResponseContainsExpectedIndicesAndFields(
Response searchResponse,
Map<String, Set<String>> expectedRemoteIndicesAndFields
) {
try (var parser = responseAsParser(searchResponse)) {
try {
assertOK(searchResponse);
final var searchResult = Arrays.stream(SearchResponse.fromXContent(parser).getHits().getHits())
.collect(Collectors.toMap(SearchHit::getIndex, SearchHit::getSourceAsMap));
var response = SearchResponseUtils.responseAsSearchResponse(searchResponse);
try {
final var searchResult = Arrays.stream(response.getHits().getHits())
.collect(Collectors.toMap(SearchHit::getIndex, SearchHit::getSourceAsMap));

assertThat(searchResult.keySet(), equalTo(expectedRemoteIndicesAndFields.keySet()));
for (String remoteIndex : expectedRemoteIndicesAndFields.keySet()) {
Set<String> expectedFields = expectedRemoteIndicesAndFields.get(remoteIndex);
assertThat(searchResult.get(remoteIndex).keySet(), equalTo(expectedFields));
assertThat(searchResult.keySet(), equalTo(expectedRemoteIndicesAndFields.keySet()));
for (String remoteIndex : expectedRemoteIndicesAndFields.keySet()) {
Set<String> expectedFields = expectedRemoteIndicesAndFields.get(remoteIndex);
assertThat(searchResult.get(remoteIndex).keySet(), equalTo(expectedFields));
}
} finally {
response.decRef();
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

protected void assertSearchResponseContainsEmptyResult(Response response) {
try (var parser = responseAsParser(response)) {
try {
assertOK(response);
SearchResponse searchResponse = SearchResponse.fromXContent(parser);
SearchResponse searchResponse = SearchResponseUtils.responseAsSearchResponse(response);
try {
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(0L));
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void testReloadRemoteClusterCredentials() throws Exception {
configureRemoteCluster(remoteAddress);

// Run search to trigger header capturing on the receiving side
client().search(new SearchRequest(CLUSTER_ALIAS + ":index-a")).get();
client().search(new SearchRequest(CLUSTER_ALIAS + ":index-a")).get().decRef();

assertHeadersContainCredentialsThenClear(credentials, capturedHeaders);

Expand All @@ -135,7 +135,7 @@ public void testReloadRemoteClusterCredentials() throws Exception {
writeCredentialsToKeyStore(updatedCredentials);
reloadSecureSettings();

client().search(new SearchRequest(CLUSTER_ALIAS + ":index-a")).get();
client().search(new SearchRequest(CLUSTER_ALIAS + ":index-a")).get().decRef();

assertHeadersContainCredentialsThenClear(updatedCredentials, capturedHeaders);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,19 +182,21 @@ void stopExecutor() {}
null,
1
);
SearchResponse scrollSearchResponse = new SearchResponse(
scrollSearchSections,
"scrollId",
1,
1,
0,
10,
ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY
);
doAnswer(invocation -> {
ActionListener<SearchResponse> listener = (ActionListener<SearchResponse>) invocation.getArguments()[2];
listener.onResponse(scrollSearchResponse);
ActionListener.respondAndRelease(
listener,
new SearchResponse(
scrollSearchSections,
"scrollId",
1,
1,
0,
10,
ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY
)
);
return null;
}).when(client).execute(eq(TransportSearchScrollAction.TYPE), any(SearchScrollRequest.class), anyActionListener());

Expand Down Expand Up @@ -222,19 +224,12 @@ void stopExecutor() {}
}
SearchHits searchHits = new SearchHits(hits, new TotalHits(count, TotalHits.Relation.EQUAL_TO), 1.0f);
SearchResponseSections sections = new SearchResponseSections(searchHits, null, null, false, false, null, 1);
SearchResponse searchResponse = new SearchResponse(
sections,
"scrollId",
1,
1,
0,
10,
ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY
);
doAnswer(invocation -> {
ActionListener<SearchResponse> listener = (ActionListener<SearchResponse>) invocation.getArguments()[2];
listener.onResponse(searchResponse);
ActionListener.respondAndRelease(
listener,
new SearchResponse(sections, "scrollId", 1, 1, 0, 10, ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY)
);
return null;
}).when(client).execute(eq(TransportSearchAction.TYPE), any(SearchRequest.class), anyActionListener());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import org.apache.http.HttpHeaders;
import org.apache.http.HttpHost;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
Expand All @@ -17,6 +16,7 @@
import org.elasticsearch.client.WarningsHandler;
import org.elasticsearch.core.Strings;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchResponseUtils;
import org.elasticsearch.test.rest.ObjectPath;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -440,17 +440,22 @@ private List<String> getAllTokenIds() throws IOException {
}""");
final Response searchResponse = client().performRequest(searchRequest);
assertOK(searchResponse);
final SearchHits searchHits = SearchResponse.fromXContent(responseAsParser(searchResponse)).getHits();
assertThat(
"Search request used with size parameter that was too small to fetch all tokens.",
searchHits.getTotalHits().value,
lessThanOrEqualTo(searchSize)
);
final List<String> tokenIds = Arrays.stream(searchHits.getHits()).map(searchHit -> {
assertNotNull(searchHit.getId());
return searchHit.getId();
}).toList();
assertThat(tokenIds, not(empty()));
return tokenIds;
var response = SearchResponseUtils.responseAsSearchResponse(searchResponse);
try {
final SearchHits searchHits = response.getHits();
assertThat(
"Search request used with size parameter that was too small to fetch all tokens.",
searchHits.getTotalHits().value,
lessThanOrEqualTo(searchSize)
);
final List<String> tokenIds = Arrays.stream(searchHits.getHits()).map(searchHit -> {
assertNotNull(searchHit.getId());
return searchHit.getId();
}).toList();
assertThat(tokenIds, not(empty()));
return tokenIds;
} finally {
response.decRef();
}
}
}