Skip to content

Commit

Permalink
Fix SearchResponse leaks in REST and Enrich tests (elastic#103533)
Browse files Browse the repository at this point in the history
It's in the title, fix a couple of tests here and there. Also, fix REST
tests in particular and add a utility to cleanly read a SearchResponse
from a REST response without leaking the parser.

for elastic#102030
  • Loading branch information
original-brownbear authored and navarone-feekery committed Dec 21, 2023
1 parent 04130ad commit 6b1f620
Show file tree
Hide file tree
Showing 11 changed files with 112 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@

import org.apache.lucene.search.TotalHits;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Response;
import org.elasticsearch.test.rest.ESRestTestCase;

import java.io.IOException;

public enum SearchResponseUtils {
;
Expand All @@ -25,4 +30,10 @@ public static TotalHits getTotalHits(SearchRequestBuilder request) {
public static long getTotalHitsValue(SearchRequestBuilder request) {
return getTotalHits(request).value;
}

public static SearchResponse responseAsSearchResponse(Response searchResponse) throws IOException {
try (var parser = ESRestTestCase.responseAsParser(searchResponse)) {
return SearchResponse.fromXContent(parser);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1824,7 +1824,7 @@ protected static Map<String, Object> responseAsMap(Response response) throws IOE
return responseEntity;
}

protected static XContentParser responseAsParser(Response response) throws IOException {
public static XContentParser responseAsParser(Response response) throws IOException {
return XContentHelper.createParser(XContentParserConfiguration.EMPTY, responseAsBytes(response), XContentType.JSON);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,11 @@ protected void ensureTaskNotRunning(String id) throws Exception {
assertBusy(() -> {
try {
AsyncSearchResponse resp = getAsyncSearch(id);
assertFalse(resp.isRunning());
try {
assertFalse(resp.isRunning());
} finally {
resp.decRef();
}
} catch (Exception exc) {
if (ExceptionsHelper.unwrapCause(exc.getCause()) instanceof ResourceNotFoundException == false) {
throw exc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,13 @@ public void testAutoCreateIndex() throws Exception {
// To begin with, the results index should be auto-created.
AsyncExecutionId id = new AsyncExecutionId("0", new TaskId("N/A", 0));
AsyncSearchResponse resp = new AsyncSearchResponse(id.getEncoded(), true, true, 0L, 0L);
{
try {
PlainActionFuture<DocWriteResponse> future = new PlainActionFuture<>();
indexService.createResponse(id.getDocId(), Collections.emptyMap(), resp, future);
future.get();
assertSettings();
} finally {
resp.decRef();
}

// Delete the index, so we can test subsequent auto-create behaviour
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,26 +257,28 @@ protected <Request extends ActionRequest, Response extends ActionResponse> void
ActionListener<Response> listener
) {
assert EnrichCoordinatorProxyAction.NAME.equals(action.name());
var emptyResponse = new SearchResponse(
new InternalSearchResponse(
new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), 0.0f),
InternalAggregations.EMPTY,
new Suggest(Collections.emptyList()),
new SearchProfileResults(Collections.emptyMap()),
false,
false,
1
),
"",
1,
1,
0,
0,
ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY
);
requestCounter[0]++;
listener.onResponse((Response) emptyResponse);
ActionListener.respondAndRelease(
listener,
(Response) new SearchResponse(
new InternalSearchResponse(
new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), 0.0f),
InternalAggregations.EMPTY,
new Suggest(Collections.emptyList()),
new SearchProfileResults(Collections.emptyMap()),
false,
false,
1
),
"",
1,
1,
0,
0,
ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY
)
);
}
};
EnrichProcessorFactory factory = new EnrichProcessorFactory(client, scriptService, enrichCache);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -153,7 +154,7 @@ public void testWriteThreadLivenessBackToBack() throws Exception {
assertThat(firstFailure.getMessage(), containsString("Could not perform enrichment, enrich coordination queue at capacity"));

client().admin().indices().refresh(new RefreshRequest(enrichedIndexName)).actionGet();
assertEquals(successfulItems, client().search(new SearchRequest(enrichedIndexName)).actionGet().getHits().getTotalHits().value);
assertHitCount(client().search(new SearchRequest(enrichedIndexName)), successfulItems);
}

public void testWriteThreadLivenessWithPipeline() throws Exception {
Expand Down Expand Up @@ -276,6 +277,6 @@ public void testWriteThreadLivenessWithPipeline() throws Exception {
assertThat(firstFailure.getMessage(), containsString("Could not perform enrichment, enrich coordination queue at capacity"));

client().admin().indices().refresh(new RefreshRequest(enrichedIndexName)).actionGet();
assertEquals(successfulItems, client().search(new SearchRequest(enrichedIndexName)).actionGet().getHits().getTotalHits().value);
assertHitCount(client().search(new SearchRequest(enrichedIndexName)), successfulItems);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,7 @@ protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse>
}

try {
SearchResponse response = searchFunction.apply(buildSearchRequest());
nextPhase.onResponse(response);
ActionListener.respondAndRelease(nextPhase, searchFunction.apply(buildSearchRequest()));
} catch (Exception e) {
nextPhase.onFailure(e);
}
Expand Down Expand Up @@ -482,8 +481,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
null,
1
);
final SearchResponse response = new SearchResponse(sections, null, 1, 1, 0, 0, ShardSearchFailure.EMPTY_ARRAY, null);
nextPhase.onResponse(response);
ActionListener.respondAndRelease(
nextPhase,
new SearchResponse(sections, null, 1, 1, 0, 0, ShardSearchFailure.EMPTY_ARRAY, null)
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchResponseUtils;
import org.elasticsearch.test.rest.ObjectPath;

import java.io.IOException;
Expand Down Expand Up @@ -170,14 +171,19 @@ protected void assertSearchResponseContainsExpectedIndicesAndFields(
String[] expectedRemoteIndices,
String[] expectedFields
) {
try (var parser = responseAsParser(searchResponse)) {
try {
assertOK(searchResponse);
final var searchResult = Arrays.stream(SearchResponse.fromXContent(parser).getHits().getHits())
.collect(Collectors.toMap(SearchHit::getIndex, SearchHit::getSourceAsMap));
var response = SearchResponseUtils.responseAsSearchResponse(searchResponse);
try {
final var searchResult = Arrays.stream(response.getHits().getHits())
.collect(Collectors.toMap(SearchHit::getIndex, SearchHit::getSourceAsMap));

assertThat(searchResult.keySet(), containsInAnyOrder(expectedRemoteIndices));
for (String remoteIndex : expectedRemoteIndices) {
assertThat(searchResult.get(remoteIndex).keySet(), containsInAnyOrder(expectedFields));
assertThat(searchResult.keySet(), containsInAnyOrder(expectedRemoteIndices));
for (String remoteIndex : expectedRemoteIndices) {
assertThat(searchResult.get(remoteIndex).keySet(), containsInAnyOrder(expectedFields));
}
} finally {
response.decRef();
}
} catch (IOException e) {
throw new UncheckedIOException(e);
Expand All @@ -193,25 +199,30 @@ protected void assertSearchResponseContainsExpectedIndicesAndFields(
Response searchResponse,
Map<String, Set<String>> expectedRemoteIndicesAndFields
) {
try (var parser = responseAsParser(searchResponse)) {
try {
assertOK(searchResponse);
final var searchResult = Arrays.stream(SearchResponse.fromXContent(parser).getHits().getHits())
.collect(Collectors.toMap(SearchHit::getIndex, SearchHit::getSourceAsMap));
var response = SearchResponseUtils.responseAsSearchResponse(searchResponse);
try {
final var searchResult = Arrays.stream(response.getHits().getHits())
.collect(Collectors.toMap(SearchHit::getIndex, SearchHit::getSourceAsMap));

assertThat(searchResult.keySet(), equalTo(expectedRemoteIndicesAndFields.keySet()));
for (String remoteIndex : expectedRemoteIndicesAndFields.keySet()) {
Set<String> expectedFields = expectedRemoteIndicesAndFields.get(remoteIndex);
assertThat(searchResult.get(remoteIndex).keySet(), equalTo(expectedFields));
assertThat(searchResult.keySet(), equalTo(expectedRemoteIndicesAndFields.keySet()));
for (String remoteIndex : expectedRemoteIndicesAndFields.keySet()) {
Set<String> expectedFields = expectedRemoteIndicesAndFields.get(remoteIndex);
assertThat(searchResult.get(remoteIndex).keySet(), equalTo(expectedFields));
}
} finally {
response.decRef();
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

protected void assertSearchResponseContainsEmptyResult(Response response) {
try (var parser = responseAsParser(response)) {
try {
assertOK(response);
SearchResponse searchResponse = SearchResponse.fromXContent(parser);
SearchResponse searchResponse = SearchResponseUtils.responseAsSearchResponse(response);
try {
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(0L));
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void testReloadRemoteClusterCredentials() throws Exception {
configureRemoteCluster(remoteAddress);

// Run search to trigger header capturing on the receiving side
client().search(new SearchRequest(CLUSTER_ALIAS + ":index-a")).get();
client().search(new SearchRequest(CLUSTER_ALIAS + ":index-a")).get().decRef();

assertHeadersContainCredentialsThenClear(credentials, capturedHeaders);

Expand All @@ -135,7 +135,7 @@ public void testReloadRemoteClusterCredentials() throws Exception {
writeCredentialsToKeyStore(updatedCredentials);
reloadSecureSettings();

client().search(new SearchRequest(CLUSTER_ALIAS + ":index-a")).get();
client().search(new SearchRequest(CLUSTER_ALIAS + ":index-a")).get().decRef();

assertHeadersContainCredentialsThenClear(updatedCredentials, capturedHeaders);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,19 +182,21 @@ void stopExecutor() {}
null,
1
);
SearchResponse scrollSearchResponse = new SearchResponse(
scrollSearchSections,
"scrollId",
1,
1,
0,
10,
ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY
);
doAnswer(invocation -> {
ActionListener<SearchResponse> listener = (ActionListener<SearchResponse>) invocation.getArguments()[2];
listener.onResponse(scrollSearchResponse);
ActionListener.respondAndRelease(
listener,
new SearchResponse(
scrollSearchSections,
"scrollId",
1,
1,
0,
10,
ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY
)
);
return null;
}).when(client).execute(eq(TransportSearchScrollAction.TYPE), any(SearchScrollRequest.class), anyActionListener());

Expand Down Expand Up @@ -222,19 +224,12 @@ void stopExecutor() {}
}
SearchHits searchHits = new SearchHits(hits, new TotalHits(count, TotalHits.Relation.EQUAL_TO), 1.0f);
SearchResponseSections sections = new SearchResponseSections(searchHits, null, null, false, false, null, 1);
SearchResponse searchResponse = new SearchResponse(
sections,
"scrollId",
1,
1,
0,
10,
ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY
);
doAnswer(invocation -> {
ActionListener<SearchResponse> listener = (ActionListener<SearchResponse>) invocation.getArguments()[2];
listener.onResponse(searchResponse);
ActionListener.respondAndRelease(
listener,
new SearchResponse(sections, "scrollId", 1, 1, 0, 10, ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY)
);
return null;
}).when(client).execute(eq(TransportSearchAction.TYPE), any(SearchRequest.class), anyActionListener());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import org.apache.http.HttpHeaders;
import org.apache.http.HttpHost;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
Expand All @@ -17,6 +16,7 @@
import org.elasticsearch.client.WarningsHandler;
import org.elasticsearch.core.Strings;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchResponseUtils;
import org.elasticsearch.test.rest.ObjectPath;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -440,17 +440,22 @@ private List<String> getAllTokenIds() throws IOException {
}""");
final Response searchResponse = client().performRequest(searchRequest);
assertOK(searchResponse);
final SearchHits searchHits = SearchResponse.fromXContent(responseAsParser(searchResponse)).getHits();
assertThat(
"Search request used with size parameter that was too small to fetch all tokens.",
searchHits.getTotalHits().value,
lessThanOrEqualTo(searchSize)
);
final List<String> tokenIds = Arrays.stream(searchHits.getHits()).map(searchHit -> {
assertNotNull(searchHit.getId());
return searchHit.getId();
}).toList();
assertThat(tokenIds, not(empty()));
return tokenIds;
var response = SearchResponseUtils.responseAsSearchResponse(searchResponse);
try {
final SearchHits searchHits = response.getHits();
assertThat(
"Search request used with size parameter that was too small to fetch all tokens.",
searchHits.getTotalHits().value,
lessThanOrEqualTo(searchSize)
);
final List<String> tokenIds = Arrays.stream(searchHits.getHits()).map(searchHit -> {
assertNotNull(searchHit.getId());
return searchHit.getId();
}).toList();
assertThat(tokenIds, not(empty()));
return tokenIds;
} finally {
response.decRef();
}
}
}

0 comments on commit 6b1f620

Please sign in to comment.