Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removing explicit SearchResponse usages in tests - v2 #102021

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -53,6 +52,7 @@
import static org.elasticsearch.common.lucene.uid.Versions.MATCH_DELETED;
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -201,16 +201,18 @@ public void testDeleteByQuery() throws Exception {
// Ensure that the write thread blocking task is currently executing
barrier.await();

final SearchResponse searchResponse = prepareSearch(sourceIndex).setSize(numDocs) // Get all indexed docs
.addSort(SORTING_FIELD, SortOrder.DESC)
.execute()
.actionGet();

// Modify a subset of the target documents concurrently
final List<SearchHit> originalDocs = Arrays.asList(searchResponse.getHits().getHits());
int conflictingOps = randomIntBetween(maxDocs, numDocs);
final List<SearchHit> docsModifiedConcurrently = randomSubsetOf(conflictingOps, originalDocs);

final int finalConflictingOps = conflictingOps;
final List<SearchHit> docsModifiedConcurrently = new ArrayList<>();
assertResponse(
prepareSearch(sourceIndex).setSize(numDocs) // Get all indexed docs
.addSort(SORTING_FIELD, SortOrder.DESC),
response -> {
// Modify a subset of the target documents concurrently
final List<SearchHit> originalDocs = Arrays.asList(response.getHits().getHits());
docsModifiedConcurrently.addAll(randomSubsetOf(finalConflictingOps, originalDocs));
}
);
BulkRequest conflictingUpdatesBulkRequest = new BulkRequest();
for (SearchHit searchHit : docsModifiedConcurrently) {
if (scriptEnabled && searchHit.getSourceAsMap().containsKey(RETURN_NOOP_FIELD)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,26 +111,30 @@ private void dotestBasicsWithRetry(int retries, int minFailures, int maxFailures
}
client.validateRequest(SearchAction.INSTANCE, (SearchRequest r) -> assertTrue(r.allowPartialSearchResults() == Boolean.FALSE));
SearchResponse searchResponse = createSearchResponse();
client.respond(SearchAction.INSTANCE, searchResponse);

for (int i = 0; i < randomIntBetween(1, 10); ++i) {
ScrollableHitSource.AsyncResponse asyncResponse = responses.poll(10, TimeUnit.SECONDS);
assertNotNull(asyncResponse);
assertEquals(responses.size(), 0);
assertSameHits(asyncResponse.response().getHits(), searchResponse.getHits().getHits());
asyncResponse.done(TimeValue.ZERO);

for (int retry = 0; retry < randomIntBetween(minFailures, maxFailures); ++retry) {
client.fail(SearchScrollAction.INSTANCE, new EsRejectedExecutionException());
client.awaitOperation();
++expectedSearchRetries;
try {
client.respond(SearchAction.INSTANCE, searchResponse);

for (int i = 0; i < randomIntBetween(1, 10); ++i) {
ScrollableHitSource.AsyncResponse asyncResponse = responses.poll(10, TimeUnit.SECONDS);
assertNotNull(asyncResponse);
assertEquals(responses.size(), 0);
assertSameHits(asyncResponse.response().getHits(), searchResponse.getHits().getHits());
asyncResponse.done(TimeValue.ZERO);

for (int retry = 0; retry < randomIntBetween(minFailures, maxFailures); ++retry) {
client.fail(SearchScrollAction.INSTANCE, new EsRejectedExecutionException());
client.awaitOperation();
++expectedSearchRetries;
}

searchResponse = createSearchResponse();
client.respond(SearchScrollAction.INSTANCE, searchResponse);
}

searchResponse = createSearchResponse();
client.respond(SearchScrollAction.INSTANCE, searchResponse);
assertEquals(actualSearchRetries.get(), expectedSearchRetries);
} finally {
searchResponse.decRef();
}

assertEquals(actualSearchRetries.get(), expectedSearchRetries);
}

public void testScrollKeepAlive() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1050,53 +1050,65 @@ private static Map<String, Object> duelSearchSync(SearchRequest searchRequest, C
throw new AssertionError("one of the two requests returned an exception", exception2.get());
}
SearchResponse minimizeRoundtripsSearchResponse = minimizeRoundtripsResponse.get();
SearchResponse fanOutSearchResponse = null;
try {
responseChecker.accept(minimizeRoundtripsSearchResponse);

// if only the remote cluster was searched, then only one reduce phase is expected
int expectedReducePhasesMinRoundTrip = 1;
if (searchRequest.indices().length > 1) {
expectedReducePhasesMinRoundTrip = searchRequest.indices().length + 1;
}

responseChecker.accept(minimizeRoundtripsSearchResponse);

// if only the remote cluster was searched, then only one reduce phase is expected
int expectedReducePhasesMinRoundTrip = 1;
if (searchRequest.indices().length > 1) {
expectedReducePhasesMinRoundTrip = searchRequest.indices().length + 1;
}

assertEquals(expectedReducePhasesMinRoundTrip, minimizeRoundtripsSearchResponse.getNumReducePhases());
SearchResponse fanOutSearchResponse = fanOutResponse.get();
responseChecker.accept(fanOutSearchResponse);
assertEquals(1, fanOutSearchResponse.getNumReducePhases());
assertEquals(expectedReducePhasesMinRoundTrip, minimizeRoundtripsSearchResponse.getNumReducePhases());
fanOutSearchResponse = fanOutResponse.get();
responseChecker.accept(fanOutSearchResponse);
assertEquals(1, fanOutSearchResponse.getNumReducePhases());

// compare Clusters objects
SearchResponse.Clusters clustersMRT = minimizeRoundtripsSearchResponse.getClusters();
SearchResponse.Clusters clustersMRTFalse = fanOutSearchResponse.getClusters();
// compare Clusters objects
SearchResponse.Clusters clustersMRT = minimizeRoundtripsSearchResponse.getClusters();
SearchResponse.Clusters clustersMRTFalse = fanOutSearchResponse.getClusters();

assertEquals(clustersMRT.getTotal(), clustersMRTFalse.getTotal());
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.FAILED),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.FAILED)
);
assertEquals(clustersMRT.getTotal(), clustersMRTFalse.getTotal());
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.FAILED),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.FAILED)
);

Map<String, Object> minimizeRoundtripsResponseMap = responseToMap(minimizeRoundtripsSearchResponse);
if (clustersMRT.hasClusterObjects() && clustersMRTFalse.hasClusterObjects()) {
Map<String, Object> fanOutResponseMap = responseToMap(fanOutSearchResponse);
compareResponseMaps(minimizeRoundtripsResponseMap, fanOutResponseMap, "Comparing sync_search minimizeRoundTrip vs. fanOut");
assertThat(minimizeRoundtripsSearchResponse.getSkippedShards(), lessThanOrEqualTo(fanOutSearchResponse.getSkippedShards()));
Map<String, Object> minimizeRoundtripsResponseMap = responseToMap(minimizeRoundtripsSearchResponse);
if (clustersMRT.hasClusterObjects() && clustersMRTFalse.hasClusterObjects()) {
Map<String, Object> fanOutResponseMap = responseToMap(fanOutSearchResponse);
compareResponseMaps(
minimizeRoundtripsResponseMap,
fanOutResponseMap,
"Comparing sync_search minimizeRoundTrip vs. fanOut"
);
assertThat(
minimizeRoundtripsSearchResponse.getSkippedShards(),
lessThanOrEqualTo(fanOutSearchResponse.getSkippedShards())
);
}
return minimizeRoundtripsResponseMap;
} finally {
if (fanOutSearchResponse != null) fanOutSearchResponse.decRef();
if (minimizeRoundtripsSearchResponse != null) minimizeRoundtripsSearchResponse.decRef();
}
return minimizeRoundtripsResponseMap;
}
}

Expand Down Expand Up @@ -1139,54 +1151,65 @@ private static Map<String, Object> duelSearchAsync(SearchRequest searchRequest,
} finally {
deleteAsyncSearch(fanOutResponse.getId());
}
SearchResponse minimizeRoundtripsSearchResponse = minimizeRoundtripsResponse.getSearchResponse();
SearchResponse fanOutSearchResponse = fanOutResponse.getSearchResponse();
SearchResponse minimizeRoundtripsSearchResponse = null;
SearchResponse fanOutSearchResponse = null;
try {
fanOutSearchResponse = fanOutResponse.getSearchResponse();
minimizeRoundtripsSearchResponse = minimizeRoundtripsResponse.getSearchResponse();

responseChecker.accept(minimizeRoundtripsSearchResponse);
responseChecker.accept(minimizeRoundtripsSearchResponse);

// if only the remote cluster was searched, then only one reduce phase is expected
int expectedReducePhasesMinRoundTrip = 1;
if (searchRequest.indices().length > 1) {
expectedReducePhasesMinRoundTrip = searchRequest.indices().length + 1;
}
assertEquals(expectedReducePhasesMinRoundTrip, minimizeRoundtripsSearchResponse.getNumReducePhases());
// if only the remote cluster was searched, then only one reduce phase is expected
int expectedReducePhasesMinRoundTrip = 1;
if (searchRequest.indices().length > 1) {
expectedReducePhasesMinRoundTrip = searchRequest.indices().length + 1;
}
assertEquals(expectedReducePhasesMinRoundTrip, minimizeRoundtripsSearchResponse.getNumReducePhases());

responseChecker.accept(fanOutSearchResponse);
assertEquals(1, fanOutSearchResponse.getNumReducePhases());
responseChecker.accept(fanOutSearchResponse);
assertEquals(1, fanOutSearchResponse.getNumReducePhases());

// compare Clusters objects
SearchResponse.Clusters clustersMRT = minimizeRoundtripsSearchResponse.getClusters();
SearchResponse.Clusters clustersMRTFalse = fanOutSearchResponse.getClusters();
// compare Clusters objects
SearchResponse.Clusters clustersMRT = minimizeRoundtripsSearchResponse.getClusters();
SearchResponse.Clusters clustersMRTFalse = fanOutSearchResponse.getClusters();

assertEquals(clustersMRT.getTotal(), clustersMRTFalse.getTotal());
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.FAILED),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.FAILED)
);
assertEquals(clustersMRT.getTotal(), clustersMRTFalse.getTotal());
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.RUNNING)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.PARTIAL)
);
assertEquals(
clustersMRT.getClusterStateCount(SearchResponse.Cluster.Status.FAILED),
clustersMRTFalse.getClusterStateCount(SearchResponse.Cluster.Status.FAILED)
);

Map<String, Object> minimizeRoundtripsResponseMap = responseToMap(minimizeRoundtripsSearchResponse);
if (clustersMRT.hasClusterObjects() && clustersMRTFalse.hasClusterObjects()) {
Map<String, Object> fanOutResponseMap = responseToMap(fanOutSearchResponse);
compareResponseMaps(minimizeRoundtripsResponseMap, fanOutResponseMap, "Comparing async_search minimizeRoundTrip vs. fanOut");
assertThat(minimizeRoundtripsSearchResponse.getSkippedShards(), lessThanOrEqualTo(fanOutSearchResponse.getSkippedShards()));
Map<String, Object> minimizeRoundtripsResponseMap = responseToMap(minimizeRoundtripsSearchResponse);
if (clustersMRT.hasClusterObjects() && clustersMRTFalse.hasClusterObjects()) {
Map<String, Object> fanOutResponseMap = responseToMap(fanOutSearchResponse);
compareResponseMaps(
minimizeRoundtripsResponseMap,
fanOutResponseMap,
"Comparing async_search minimizeRoundTrip vs. fanOut"
);
assertThat(minimizeRoundtripsSearchResponse.getSkippedShards(), lessThanOrEqualTo(fanOutSearchResponse.getSkippedShards()));
}
return minimizeRoundtripsResponseMap;
} finally {
if (minimizeRoundtripsSearchResponse != null) minimizeRoundtripsSearchResponse.decRef();
if (fanOutSearchResponse != null) fanOutSearchResponse.decRef();
}
return minimizeRoundtripsResponseMap;
}

private static void compareResponseMaps(Map<String, Object> responseMap1, Map<String, Object> responseMap2, String info) {
Expand Down
Loading