Skip to content

Commit

Permalink
Added decRef calls to SearchResponse tests (#102040)
Browse files Browse the repository at this point in the history
Tests covered these sections of the ES repo
x-pack/plugin/monitoring
x-pack/plugin/rank-rrf
x-pack/plugin/rollup
x-pack/plugin/search-business-rules

The decRef was either added explicitly or implicitly by using ElasticsearchAssertions.assertResponse.
  • Loading branch information
quux00 authored Nov 13, 2023
1 parent b3c3cc1 commit 57ae150
Show file tree
Hide file tree
Showing 11 changed files with 1,922 additions and 1,822 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -59,11 +58,12 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static org.elasticsearch.common.xcontent.support.XContentMapValues.extractValue;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertCheckedResponse;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
import static org.elasticsearch.threadpool.ThreadPool.Names.WRITE;
import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS;
import static org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils.TEMPLATE_VERSION;
Expand Down Expand Up @@ -145,40 +145,43 @@ public void testMonitoringBulk() throws Exception {
ensureGreen(monitoringIndex);
assertThat(client().admin().indices().prepareRefresh(monitoringIndex).get().getStatus(), is(RestStatus.OK));

final SearchResponse response = client().prepareSearch(".monitoring-" + system.getSystem() + "-" + TEMPLATE_VERSION + "-*")
.get();
assertResponse(client().prepareSearch(".monitoring-" + system.getSystem() + "-" + TEMPLATE_VERSION + "-*"), response -> {
// exactly 3 results are expected
assertThat("No monitoring documents yet", response.getHits().getTotalHits().value, equalTo(3L));

// exactly 3 results are expected
assertThat("No monitoring documents yet", response.getHits().getTotalHits().value, equalTo(3L));
final List<Map<String, Object>> sources = Arrays.stream(response.getHits().getHits())
.map(SearchHit::getSourceAsMap)
.collect(Collectors.toList());

final List<Map<String, Object>> sources = Arrays.stream(response.getHits().getHits())
.map(SearchHit::getSourceAsMap)
.collect(Collectors.toList());

// find distinct _source.timestamp fields
assertThat(sources.stream().map(source -> source.get("timestamp")).distinct().count(), is(1L));
// find distinct _source.source_node fields (which is a map)
assertThat(sources.stream().map(source -> source.get("source_node")).distinct().count(), is(1L));
// find distinct _source.timestamp fields
assertThat(sources.stream().map(source -> source.get("timestamp")).distinct().count(), is(1L));
// find distinct _source.source_node fields (which is a map)
assertThat(sources.stream().map(source -> source.get("source_node")).distinct().count(), is(1L));
});
});

final SearchResponse response = client().prepareSearch(monitoringIndex).get();
final SearchHits hits = response.getHits();
assertCheckedResponse(client().prepareSearch(monitoringIndex), response -> {
final SearchHits hits = response.getHits();

assertThat(response.getHits().getTotalHits().value, equalTo(3L));
assertThat(
"Monitoring documents must have the same timestamp",
Arrays.stream(hits.getHits()).map(hit -> extractValue("timestamp", hit.getSourceAsMap())).distinct().count(),
equalTo(1L)
);
assertThat(
"Monitoring documents must have the same source_node timestamp",
Arrays.stream(hits.getHits()).map(hit -> extractValue("source_node.timestamp", hit.getSourceAsMap())).distinct().count(),
equalTo(1L)
);
assertThat(response.getHits().getTotalHits().value, equalTo(3L));
assertThat(
"Monitoring documents must have the same timestamp",
Arrays.stream(hits.getHits()).map(hit -> extractValue("timestamp", hit.getSourceAsMap())).distinct().count(),
equalTo(1L)
);
assertThat(
"Monitoring documents must have the same source_node timestamp",
Arrays.stream(hits.getHits())
.map(hit -> extractValue("source_node.timestamp", hit.getSourceAsMap()))
.distinct()
.count(),
equalTo(1L)
);

for (final SearchHit hit : hits.getHits()) {
assertMonitoringDoc(toMap(hit), system, interval);
}
for (final SearchHit hit : hits.getHits()) {
assertMonitoringDoc(toMap(hit), system, interval);
}
});
});
}

Expand Down Expand Up @@ -206,30 +209,27 @@ public void testMonitoringService() throws Exception {
assertAcked(clusterAdmin().prepareUpdateSettings().setTransientSettings(settings));

whenExportersAreReady(() -> {
final AtomicReference<SearchResponse> searchResponse = new AtomicReference<>();

assertBusy(() -> {
final SearchResponse response = client().prepareSearch(".monitoring-es-*")
.setCollapse(new CollapseBuilder("type"))
.addSort("timestamp", SortOrder.DESC)
.get();

assertThat(response.status(), is(RestStatus.OK));
assertThat(
"Expecting a minimum number of 6 docs, one per collector",
response.getHits().getHits().length,
greaterThanOrEqualTo(6)
assertCheckedResponse(
client().prepareSearch(".monitoring-es-*")
.setCollapse(new CollapseBuilder("type"))
.addSort("timestamp", SortOrder.DESC),
response -> {
assertThat(response.status(), is(RestStatus.OK));
assertThat(
"Expecting a minimum number of 6 docs, one per collector",
response.getHits().getHits().length,
greaterThanOrEqualTo(6)
);

for (final SearchHit hit : response.getHits()) {
final Map<String, Object> searchHit = toMap(hit);
assertMonitoringDoc(searchHit, MonitoredSystem.ES, MonitoringService.MIN_INTERVAL);
}
}
);

searchResponse.set(response);
});

for (final SearchHit hit : searchResponse.get().getHits()) {
final Map<String, Object> searchHit = toMap(hit);
assertMonitoringDoc(searchHit, MonitoredSystem.ES, MonitoringService.MIN_INTERVAL);
}
});

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
*/
package org.elasticsearch.xpack.monitoring;

import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.Aggregation;
Expand All @@ -20,6 +19,7 @@

import static org.elasticsearch.test.NodeRoles.noRoles;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
Expand Down Expand Up @@ -78,24 +78,27 @@ public void testMultipleNodes() throws Exception {
flush(ALL_MONITORING_INDICES);
refresh();

SearchResponse response = prepareSearch(ALL_MONITORING_INDICES).setQuery(
QueryBuilders.termQuery("type", NodeStatsMonitoringDoc.TYPE)
).setSize(0).addAggregation(AggregationBuilders.terms("nodes_ids").field("node_stats.node_id")).get();

for (Aggregation aggregation : response.getAggregations()) {
assertThat(aggregation, instanceOf(StringTerms.class));
assertThat(((StringTerms) aggregation).getBuckets().size(), equalTo(nbNodes));

for (String nodeName : internalCluster().getNodeNames()) {
StringTerms.Bucket bucket = ((StringTerms) aggregation).getBucketByKey(
internalCluster().clusterService(nodeName).localNode().getId()
);
// At least 1 doc must exist per node, but it can be more than 1
// because the first node may have already collected many node stats documents
// whereas the last node just started to collect node stats.
assertThat(bucket.getDocCount(), greaterThanOrEqualTo(1L));
assertResponse(
prepareSearch(ALL_MONITORING_INDICES).setQuery(QueryBuilders.termQuery("type", NodeStatsMonitoringDoc.TYPE))
.setSize(0)
.addAggregation(AggregationBuilders.terms("nodes_ids").field("node_stats.node_id")),
response -> {
for (Aggregation aggregation : response.getAggregations()) {
assertThat(aggregation, instanceOf(StringTerms.class));
assertThat(((StringTerms) aggregation).getBuckets().size(), equalTo(nbNodes));

for (String nodeName : internalCluster().getNodeNames()) {
StringTerms.Bucket bucket = ((StringTerms) aggregation).getBucketByKey(
internalCluster().clusterService(nodeName).localNode().getId()
);
// At least 1 doc must exist per node, but it can be more than 1
// because the first node may have already collected many node stats documents
// whereas the last node just started to collect node stats.
assertThat(bucket.getDocCount(), greaterThanOrEqualTo(1L));
}
}
}
}
);
});
}
}
Loading

0 comments on commit 57ae150

Please sign in to comment.