Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix more search response leaks in tests #103957

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.atomic.LongAdder;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;

Expand Down Expand Up @@ -136,9 +137,9 @@ public void testRetrieveSnapshots() throws Exception {
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0));

logger.info("--> run a search");
var searchResponse = client.prepareSearch("test-idx").setQuery(QueryBuilders.termQuery("text", "sometext")).get();

assertThat(searchResponse.getHits().getTotalHits().value, greaterThan(0L));
assertThat(COUNTS.intValue(), greaterThan(0));
assertResponse(client.prepareSearch("test-idx").setQuery(QueryBuilders.termQuery("text", "sometext")), searchResponse -> {
assertThat(searchResponse.getHits().getTotalHits().value, greaterThan(0L));
assertThat(COUNTS.intValue(), greaterThan(0));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.xpack.search;

import org.apache.http.util.EntityUtils;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
Expand Down Expand Up @@ -182,15 +183,19 @@ private void testCase(String user, String other) throws Exception {
private SearchHit[] getSearchHits(String asyncId, String user) throws IOException {
final Response resp = getAsyncSearch(asyncId, user);
assertOK(resp);
AsyncSearchResponse searchResponse = AsyncSearchResponse.fromXContent(
SearchResponse searchResponse = AsyncSearchResponse.fromXContent(
XContentHelper.createParser(
NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE,
new BytesArray(EntityUtils.toByteArray(resp.getEntity())),
XContentType.JSON
)
);
return searchResponse.getSearchResponse().getHits().getHits();
).getSearchResponse();
try {
return searchResponse.getHits().getHits();
} finally {
searchResponse.decRef();
}
}

public void testAuthorizationOfPointInTime() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -865,23 +865,25 @@ protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse>
} catch (IOException e) {
listener.onFailure(e);
}
SearchResponse response = new SearchResponse(
null,
new Aggregations(Collections.singletonList(result)),
null,
false,
null,
null,
1,
null,
1,
1,
0,
0,
ShardSearchFailure.EMPTY_ARRAY,
null
ActionListener.respondAndRelease(
listener,
new SearchResponse(
null,
new Aggregations(Collections.singletonList(result)),
null,
false,
null,
null,
1,
null,
1,
1,
0,
0,
ShardSearchFailure.EMPTY_ARRAY,
null
)
);
listener.onResponse(response);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,23 +105,25 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return null;
}
}));
final SearchResponse response = new SearchResponse(
new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), 0),
aggs,
null,
false,
null,
null,
1,
null,
1,
1,
0,
0,
new ShardSearchFailure[0],
null
ActionListener.respondAndRelease(
nextPhase,
new SearchResponse(
new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), 0),
aggs,
null,
false,
null,
null,
1,
null,
1,
1,
0,
0,
new ShardSearchFailure[0],
null
)
);
nextPhase.onResponse(response);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,36 +178,41 @@ private void testSourceHasChanged(
TimeValue delay,
Tuple<Long, Long> expectedRangeQueryBounds
) throws InterruptedException {
doAnswer(withResponse(newSearchResponse(totalHits))).when(client).execute(eq(TransportSearchAction.TYPE), any(), any());
String transformId = getTestName();
TransformConfig transformConfig = newTransformConfigWithDateHistogram(
transformId,
transformVersion,
dateHistogramField,
dateHistogramInterval,
delay
);
TimeBasedCheckpointProvider provider = newCheckpointProvider(transformConfig);
final SearchResponse searchResponse = newSearchResponse(totalHits);
try {
doAnswer(withResponse(searchResponse)).when(client).execute(eq(TransportSearchAction.TYPE), any(), any());
String transformId = getTestName();
TransformConfig transformConfig = newTransformConfigWithDateHistogram(
transformId,
transformVersion,
dateHistogramField,
dateHistogramInterval,
delay
);
TimeBasedCheckpointProvider provider = newCheckpointProvider(transformConfig);

SetOnce<Boolean> hasChangedHolder = new SetOnce<>();
SetOnce<Exception> exceptionHolder = new SetOnce<>();
CountDownLatch latch = new CountDownLatch(1);
provider.sourceHasChanged(
lastCheckpoint,
new LatchedActionListener<>(ActionListener.wrap(hasChangedHolder::set, exceptionHolder::set), latch)
);
assertThat(latch.await(100, TimeUnit.MILLISECONDS), is(true));
SetOnce<Boolean> hasChangedHolder = new SetOnce<>();
SetOnce<Exception> exceptionHolder = new SetOnce<>();
CountDownLatch latch = new CountDownLatch(1);
provider.sourceHasChanged(
lastCheckpoint,
new LatchedActionListener<>(ActionListener.wrap(hasChangedHolder::set, exceptionHolder::set), latch)
);
assertThat(latch.await(100, TimeUnit.MILLISECONDS), is(true));

ArgumentCaptor<SearchRequest> searchRequestArgumentCaptor = ArgumentCaptor.forClass(SearchRequest.class);
verify(client).execute(eq(TransportSearchAction.TYPE), searchRequestArgumentCaptor.capture(), any());
SearchRequest searchRequest = searchRequestArgumentCaptor.getValue();
BoolQueryBuilder boolQuery = (BoolQueryBuilder) searchRequest.source().query();
RangeQueryBuilder rangeQuery = (RangeQueryBuilder) boolQuery.filter().get(1);
assertThat(rangeQuery.from(), is(equalTo(expectedRangeQueryBounds.v1())));
assertThat(rangeQuery.to(), is(equalTo(expectedRangeQueryBounds.v2())));
ArgumentCaptor<SearchRequest> searchRequestArgumentCaptor = ArgumentCaptor.forClass(SearchRequest.class);
verify(client).execute(eq(TransportSearchAction.TYPE), searchRequestArgumentCaptor.capture(), any());
SearchRequest searchRequest = searchRequestArgumentCaptor.getValue();
BoolQueryBuilder boolQuery = (BoolQueryBuilder) searchRequest.source().query();
RangeQueryBuilder rangeQuery = (RangeQueryBuilder) boolQuery.filter().get(1);
assertThat(rangeQuery.from(), is(equalTo(expectedRangeQueryBounds.v1())));
assertThat(rangeQuery.to(), is(equalTo(expectedRangeQueryBounds.v2())));

assertThat(hasChangedHolder.get(), is(equalTo(expectedHasChangedValue)));
assertThat(exceptionHolder.get(), is(nullValue()));
assertThat(hasChangedHolder.get(), is(equalTo(expectedHasChangedValue)));
assertThat(exceptionHolder.get(), is(nullValue()));
} finally {
searchResponse.decRef();
}
}

public void testCreateNextCheckpoint_NoDelay() throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,26 +541,28 @@ protected <Request extends ActionRequest, Response extends ActionResponse> void
&& "the_pit_id+++".equals(searchRequest.pointInTimeBuilder().getEncodedId())) {
listener.onFailure(new SearchContextMissingException(new ShardSearchContextId("sc_missing", 42)));
} else {
SearchResponse response = new SearchResponse(
new SearchHits(new SearchHit[] { new SearchHit(1) }, new TotalHits(1L, TotalHits.Relation.EQUAL_TO), 1.0f),
// Simulate completely null aggs
null,
new Suggest(Collections.emptyList()),
false,
false,
new SearchProfileResults(Collections.emptyMap()),
1,
null,
1,
1,
0,
0,
ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY,
// copy the pit from the request
searchRequest.pointInTimeBuilder() != null ? searchRequest.pointInTimeBuilder().getEncodedId() + "+" : null
ActionListener.respondAndRelease(
listener,
(Response) new SearchResponse(
new SearchHits(new SearchHit[] { new SearchHit(1) }, new TotalHits(1L, TotalHits.Relation.EQUAL_TO), 1.0f),
// Simulate completely null aggs
null,
new Suggest(Collections.emptyList()),
false,
false,
new SearchProfileResults(Collections.emptyMap()),
1,
null,
1,
1,
0,
0,
ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY,
// copy the pit from the request
searchRequest.pointInTimeBuilder() != null ? searchRequest.pointInTimeBuilder().getEncodedId() + "+" : null
)
);
listener.onResponse((Response) response);

}
return;
Expand Down
Loading