Skip to content

Commit

Permalink
Add more collectorResult test and work on the PR comments (#9129)
Browse files Browse the repository at this point in the history
Signed-off-by: Ticheng Lin <ticheng@amazon.com>
  • Loading branch information
ticheng-aws committed Aug 12, 2023
1 parent e8352de commit 2bf382d
Show file tree
Hide file tree
Showing 10 changed files with 218 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.search.aggregations.metrics.Stats;
import org.opensearch.search.profile.ProfileResult;
import org.opensearch.search.profile.ProfileShardResult;
import org.opensearch.search.profile.query.CollectorResult;
import org.opensearch.search.profile.query.QueryProfileShardResult;
import org.opensearch.test.OpenSearchIntegTestCase;

Expand All @@ -67,8 +68,11 @@
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.containsString;

@OpenSearchIntegTestCase.SuiteScopeTestCase
public class AggregationProfilerIT extends OpenSearchIntegTestCase {
Expand Down Expand Up @@ -150,6 +154,8 @@ public class AggregationProfilerIT extends OpenSearchIntegTestCase {
private static final String TAG_FIELD = "tag";
private static final String STRING_FIELD = "string_field";
private final int numDocs = 5;
private static final String REASON_SEARCH_TOP_HITS = "search_top_hits";
private static final String REASON_AGGREGATION = "aggregation";

@Override
protected int numberOfShards() {
Expand Down Expand Up @@ -217,8 +223,14 @@ public void testSimpleProfile() {
if (histoAggResult.getMaxSliceTime() != null) {
// concurrent segment search enabled
assertThat(breakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 2);
}
} else {
assertThat(breakdown.keySet(), equalTo(BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResult(collectorResult, 2);
}
}
assertThat(breakdown.get(INITIALIZE), greaterThan(0L));
assertThat(breakdown.get(COLLECT), greaterThan(0L));
Expand Down Expand Up @@ -265,8 +277,14 @@ public void testMultiLevelProfile() {
if (histoAggResult.getMaxSliceTime() != null) {
// concurrent segment search enabled
assertThat(histoBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 2);
}
} else {
assertThat(histoBreakdown.keySet(), equalTo(BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResult(collectorResult, 2);
}
}
assertThat(histoBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(histoBreakdown.get(COLLECT), greaterThan(0L));
Expand Down Expand Up @@ -366,8 +384,14 @@ public void testMultiLevelProfileBreadthFirst() {
if (histoAggResult.getMaxSliceTime() != null) {
// concurrent segment search enabled
assertThat(histoBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 2);
}
} else {
assertThat(histoBreakdown.keySet(), equalTo(BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResult(collectorResult, 2);
}
}
assertThat(histoBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(histoBreakdown.get(COLLECT), greaterThan(0L));
Expand Down Expand Up @@ -452,8 +476,14 @@ public void testDiversifiedAggProfile() {
if (diversifyAggResult.getMaxSliceTime() != null) {
// concurrent segment search enabled
assertThat(diversifyBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 2);
}
} else {
assertThat(diversifyBreakdown.keySet(), equalTo(BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResult(collectorResult, 2);
}
}
assertThat(diversifyBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(diversifyBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
Expand Down Expand Up @@ -532,8 +562,14 @@ public void testComplexProfile() {
if (histoAggResult.getMaxSliceTime() != null) {
// concurrent segment search enabled
assertThat(histoBreakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 2);
}
} else {
assertThat(histoBreakdown.keySet(), equalTo(BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResult(collectorResult, 2);
}
}
assertThat(histoBreakdown.get(INITIALIZE), greaterThan(0L));
assertThat(histoBreakdown.get(BUILD_LEAF_COLLECTOR), greaterThan(0L));
Expand Down Expand Up @@ -792,7 +828,6 @@ public void testGlobalAggWithStatsSubAggregatorProfile() {
.get();

assertSearchResponse(response);

Global global = response.getAggregations().get("global");
assertThat(global, IsNull.notNullValue());
assertThat(global.getName(), equalTo("global"));
Expand Down Expand Up @@ -843,13 +878,106 @@ public void testGlobalAggWithStatsSubAggregatorProfile() {
if (globalAggResult.getMaxSliceTime() != null) {
// concurrent segment search enabled
assertEquals(CONCURRENT_SEARCH_BREAKDOWN_KEYS, breakdown.keySet());
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 0);
}
} else {
assertEquals(BREAKDOWN_KEYS, breakdown.keySet());
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResult(collectorResult, 0);
}
}
assertThat(breakdown.get(INITIALIZE), greaterThan(0L));
assertThat(breakdown.get(COLLECT), greaterThan(0L));
assertThat(breakdown.get(BUILD_AGGREGATION).longValue(), greaterThan(0L));
assertEquals(0, breakdown.get(REDUCE).intValue());
}
}

public void testMultipleAggregationsProfile() {
SearchResponse response = client().prepareSearch("idx")
.setProfile(true)
.addAggregation(histogram("histo_1").field(NUMBER_FIELD).interval(1L))
.addAggregation(histogram("histo_2").field(NUMBER_FIELD).interval(1L))
.get();
assertSearchResponse(response);
Map<String, ProfileShardResult> profileResults = response.getProfileResults();
assertThat(profileResults, notNullValue());
assertThat(profileResults.size(), equalTo(getNumShards("idx").numPrimaries));
for (ProfileShardResult profileShardResult : profileResults.values()) {
assertThat(profileShardResult, notNullValue());
List<QueryProfileShardResult> queryProfilerResults = profileShardResult.getQueryProfileResults();
assertThat(queryProfilerResults, notNullValue());
for (QueryProfileShardResult queryProfilerResult : queryProfilerResults) {
CollectorResult collectorResult = queryProfilerResult.getCollectorResult();
String reason = collectorResult.getReason();
assertThat(reason, equalTo("search_multi"));
List<CollectorResult> children = collectorResult.getProfiledChildren();
assertThat(children.size(), equalTo(2));
assertThat(children.get(1).getName(), containsString("[histo_1, histo_2]"));
}
AggregationProfileShardResult aggProfileResults = profileShardResult.getAggregationProfileResults();
assertThat(aggProfileResults, notNullValue());
List<ProfileResult> aggProfileResultsList = aggProfileResults.getProfileResults();
assertThat(aggProfileResultsList, notNullValue());
assertThat(aggProfileResultsList.size(), equalTo(2));
for (ProfileResult histoAggResult : aggProfileResultsList) {
assertThat(histoAggResult, notNullValue());
assertThat(histoAggResult.getQueryName(), equalTo("NumericHistogramAggregator"));
assertThat(histoAggResult.getLuceneDescription(), containsString("histo_"));
assertThat(histoAggResult.getProfiledChildren().size(), equalTo(0));
assertThat(histoAggResult.getTime(), greaterThan(0L));
Map<String, Long> breakdown = histoAggResult.getTimeBreakdown();
assertThat(breakdown, notNullValue());
if (histoAggResult.getMaxSliceTime() != null) {
// concurrent segment search enabled
assertThat(breakdown.keySet(), equalTo(CONCURRENT_SEARCH_BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResultWithConcurrentSearchEnabled(collectorResult, 2);
}
} else {
assertThat(breakdown.keySet(), equalTo(BREAKDOWN_KEYS));
for (QueryProfileShardResult collectorResult : profileShardResult.getQueryProfileResults()) {
assertCollectorResult(collectorResult, 2);
}
}
assertThat(breakdown.get(INITIALIZE), greaterThan(0L));
assertThat(breakdown.get(COLLECT), greaterThan(0L));
assertThat(breakdown.get(BUILD_AGGREGATION).longValue(), greaterThan(0L));
assertThat(breakdown.get(REDUCE), equalTo(0L));
Map<String, Object> debug = histoAggResult.getDebugInfo();
assertThat(debug, notNullValue());
assertThat(debug.keySet(), equalTo(Set.of(TOTAL_BUCKETS)));
assertThat(((Number) debug.get(TOTAL_BUCKETS)).longValue(), greaterThan(0L));
}
}
}

private void assertCollectorResult(QueryProfileShardResult collectorResult, int expectedChildrenCount) {
long nodeTime = collectorResult.getCollectorResult().getTime();
assertThat(collectorResult.getCollectorResult().getMaxSliceTime(), equalTo(nodeTime));
assertThat(collectorResult.getCollectorResult().getMinSliceTime(), equalTo(nodeTime));
assertThat(collectorResult.getCollectorResult().getAvgSliceTime(), equalTo(nodeTime));
assertThat(collectorResult.getCollectorResult().getReduceTime(), equalTo(0L));
assertThat(collectorResult.getCollectorResult().getSliceCount(), equalTo(1));
assertThat(collectorResult.getCollectorResult().getProfiledChildren().size(), equalTo(expectedChildrenCount));
if (expectedChildrenCount == 2) {
assertThat(collectorResult.getCollectorResult().getProfiledChildren().get(0).getReason(), equalTo(REASON_SEARCH_TOP_HITS));
assertThat(collectorResult.getCollectorResult().getProfiledChildren().get(1).getReason(), equalTo(REASON_AGGREGATION));
}
}

private void assertCollectorResultWithConcurrentSearchEnabled(QueryProfileShardResult collectorResult, int expectedChildrenCount) {
long nodeTime = collectorResult.getCollectorResult().getTime();
assertThat(collectorResult.getCollectorResult().getMaxSliceTime(), lessThanOrEqualTo(nodeTime));
assertThat(collectorResult.getCollectorResult().getMinSliceTime(), lessThanOrEqualTo(nodeTime));
assertThat(collectorResult.getCollectorResult().getAvgSliceTime(), lessThanOrEqualTo(nodeTime));
assertThat(collectorResult.getCollectorResult().getReduceTime(), greaterThan(0L));
assertThat(collectorResult.getCollectorResult().getSliceCount(), greaterThanOrEqualTo(1));
assertThat(collectorResult.getCollectorResult().getProfiledChildren().size(), equalTo(expectedChildrenCount));
if (expectedChildrenCount == 2) {
assertThat(collectorResult.getCollectorResult().getProfiledChildren().get(0).getReason(), equalTo(REASON_SEARCH_TOP_HITS));
assertThat(collectorResult.getCollectorResult().getProfiledChildren().get(1).getReason(), equalTo(REASON_AGGREGATION));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,19 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

/**
* Common {@link CollectorManager} used by both concurrent and non-concurrent aggregation path and also for global and non-global
* aggregation operators
*
* @opensearch.internal
*/
class AggregationCollectorManager implements CollectorManager<Collector, ReduceableSearchResult> {
public abstract class AggregationCollectorManager implements CollectorManager<Collector, ReduceableSearchResult> {
protected final SearchContext context;
private final CheckedFunction<SearchContext, List<Aggregator>, IOException> aggProvider;
private final String collectorReason;
protected String collectorName;

AggregationCollectorManager(
SearchContext context,
Expand All @@ -51,6 +53,8 @@ public String getCollectorReason() {
return collectorReason;
}

public abstract String getCollectorName();

@Override
public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IOException {
final List<Aggregator> aggregators = context.bucketCollectorProcessor().toAggregators(collectors);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.apache.lucene.search.Query;
import org.opensearch.common.lucene.search.Queries;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.profile.query.CollectorResult;
import org.opensearch.search.profile.query.InternalProfileCollectorManager;
import org.opensearch.search.profile.query.InternalProfileComponent;
import org.opensearch.search.query.QueryPhaseExecutionException;
Expand Down Expand Up @@ -68,7 +67,7 @@ public void postProcess(SearchContext context) {
if (context.getProfilers() != null) {
globalCollectorManager = new InternalProfileCollectorManager(
globalCollectorManager,
CollectorResult.REASON_AGGREGATION_GLOBAL,
((AggregationCollectorManager) globalCollectorManager).getCollectorReason(),
Collections.emptyList()
);
context.getProfilers().addQueryProfiler().setCollector((InternalProfileComponent) globalCollectorManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class GlobalAggCollectorManager extends AggregationCollectorManager {
public GlobalAggCollectorManager(SearchContext context) throws IOException {
super(context, context.aggregations().factories()::createTopLevelGlobalAggregators, CollectorResult.REASON_AGGREGATION_GLOBAL);
collector = Objects.requireNonNull(super.newCollector(), "collector instance is null");
collectorName = collector.toString();
}

@Override
Expand All @@ -48,4 +49,9 @@ protected AggregationReduceableSearchResult buildAggregationResult(InternalAggre
InternalAggregations.reduce(Collections.singletonList(internalAggregations), context.partialOnShard())
);
}

@Override
public String getCollectorName() {
return collectorName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class GlobalAggCollectorManagerWithSingleCollector extends AggregationCol
public GlobalAggCollectorManagerWithSingleCollector(SearchContext context) throws IOException {
super(context, context.aggregations().factories()::createTopLevelGlobalAggregators, CollectorResult.REASON_AGGREGATION_GLOBAL);
collector = Objects.requireNonNull(super.newCollector(), "collector instance is null");
collectorName = collector.toString();
}

@Override
Expand All @@ -42,4 +43,9 @@ public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IO
assert collectors.isEmpty() : "Reduce on GlobalAggregationCollectorManagerWithCollector called with non-empty collectors";
return super.reduce(List.of(collector));
}

@Override
public String getCollectorName() {
return collectorName;

Check warning on line 49 in server/src/main/java/org/opensearch/search/aggregations/GlobalAggCollectorManagerWithSingleCollector.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/GlobalAggCollectorManagerWithSingleCollector.java#L49

Added line #L49 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class NonGlobalAggCollectorManager extends AggregationCollectorManager {
public NonGlobalAggCollectorManager(SearchContext context) throws IOException {
super(context, context.aggregations().factories()::createTopLevelNonGlobalAggregators, CollectorResult.REASON_AGGREGATION);
collector = Objects.requireNonNull(super.newCollector(), "collector instance is null");
collectorName = collector.toString();
}

@Override
Expand All @@ -48,4 +49,9 @@ protected AggregationReduceableSearchResult buildAggregationResult(InternalAggre
InternalAggregations.reduce(Collections.singletonList(internalAggregations), context.partialOnShard())
);
}

@Override
public String getCollectorName() {
return collectorName;

Check warning on line 55 in server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManager.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManager.java#L55

Added line #L55 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class NonGlobalAggCollectorManagerWithSingleCollector extends Aggregation
public NonGlobalAggCollectorManagerWithSingleCollector(SearchContext context) throws IOException {
super(context, context.aggregations().factories()::createTopLevelNonGlobalAggregators, CollectorResult.REASON_AGGREGATION);
collector = Objects.requireNonNull(super.newCollector(), "collector instance is null");
collectorName = collector.toString();
}

@Override
Expand All @@ -42,4 +43,9 @@ public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IO
assert collectors.isEmpty() : "Reduce on NonGlobalAggregationCollectorManagerWithCollector called with non-empty collectors";
return super.reduce(List.of(collector));
}

@Override
public String getCollectorName() {
return collectorName;

Check warning on line 49 in server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManagerWithSingleCollector.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManagerWithSingleCollector.java#L49

Added line #L49 was not covered by tests
}
}
Loading

0 comments on commit 2bf382d

Please sign in to comment.