diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index c55c2ab683b11..53bb799dd6c5b 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -235,7 +235,7 @@ import org.opensearch.search.aggregations.support.AggregationUsageService; import org.opensearch.search.backpressure.SearchBackpressureService; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; -import org.opensearch.search.deciders.ConcurrentSearchDecider; +import org.opensearch.search.deciders.ConcurrentSearchRequestDecider; import org.opensearch.search.fetch.FetchPhase; import org.opensearch.search.pipeline.SearchPipelineService; import org.opensearch.search.query.QueryPhase; @@ -1341,7 +1341,7 @@ protected Node( circuitBreakerService, searchModule.getIndexSearcherExecutor(threadPool), taskResourceTrackingService, - searchModule.getConcurrentSearchDeciders() + searchModule.getConcurrentSearchRequestDeciderFactories() ); final List> tasksExecutors = pluginsService.filterPlugins(PersistentTaskPlugin.class) @@ -2000,7 +2000,7 @@ protected SearchService newSearchService( CircuitBreakerService circuitBreakerService, Executor indexSearcherExecutor, TaskResourceTrackingService taskResourceTrackingService, - Collection concurrentSearchDecidersList + Collection> concurrentSearchDeciderFactories ) { return new SearchService( clusterService, @@ -2014,7 +2014,7 @@ protected SearchService newSearchService( circuitBreakerService, indexSearcherExecutor, taskResourceTrackingService, - concurrentSearchDecidersList + concurrentSearchDeciderFactories ); } diff --git a/server/src/main/java/org/opensearch/plugins/SearchPlugin.java b/server/src/main/java/org/opensearch/plugins/SearchPlugin.java index 498da4042fa33..710dd32371f83 100644 --- a/server/src/main/java/org/opensearch/plugins/SearchPlugin.java +++ b/server/src/main/java/org/opensearch/plugins/SearchPlugin.java @@ -65,7 +65,7 @@ import org.opensearch.search.aggregations.pipeline.MovAvgPipelineAggregator; import org.opensearch.search.aggregations.pipeline.PipelineAggregator; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; -import org.opensearch.search.deciders.ConcurrentSearchDecider; +import org.opensearch.search.deciders.ConcurrentSearchRequestDecider; import org.opensearch.search.fetch.FetchSubPhase; import org.opensearch.search.fetch.subphase.highlight.Highlighter; import org.opensearch.search.query.QueryPhaseSearcher; @@ -141,12 +141,12 @@ default Map getHighlighters() { } /** - * Allows plugins to register custom decider for concurrent search - * @return A {@link ConcurrentSearchDecider} + * Allows plugins to register a factory to create custom decider for concurrent search + * @return A {@link ConcurrentSearchRequestDecider.Factory} */ @ExperimentalApi - default ConcurrentSearchDecider getConcurrentSearchDecider() { - return null; + default Optional getConcurrentSearchRequestDeciderFactory() { + return Optional.empty(); } /** diff --git a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java index 1706c27ccf922..945235717cd20 100644 --- a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java @@ -72,8 +72,8 @@ import org.opensearch.search.aggregations.SearchContextAggregations; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.collapse.CollapseContext; -import org.opensearch.search.deciders.ConcurrentSearchDecider; import org.opensearch.search.deciders.ConcurrentSearchDecision; +import org.opensearch.search.deciders.ConcurrentSearchRequestDecider; import org.opensearch.search.deciders.ConcurrentSearchVisitor; import org.opensearch.search.dfs.DfsSearchResult; import org.opensearch.search.fetch.FetchPhase; @@ -106,13 +106,14 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.Executor; import java.util.function.Function; import java.util.function.LongSupplier; -import java.util.stream.Collectors; import static org.opensearch.search.SearchService.CARDINALITY_AGGREGATION_PRUNING_THRESHOLD; import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE; @@ -137,7 +138,7 @@ final class DefaultSearchContext extends SearchContext { private final ShardSearchRequest request; private final SearchShardTarget shardTarget; private final LongSupplier relativeTimeSupplier; - private final Collection concurrentSearchDeciders; + private final Collection> concurrentSearchDeciderFactories; private SearchType searchType; private final BigArrays bigArrays; private final IndexShard indexShard; @@ -223,7 +224,7 @@ final class DefaultSearchContext extends SearchContext { boolean validate, Executor executor, Function requestToAggReduceContextBuilder, - Collection concurrentSearchDeciders + Collection> concurrentSearchDeciderFactories ) throws IOException { this.readerContext = readerContext; this.request = request; @@ -267,7 +268,7 @@ final class DefaultSearchContext extends SearchContext { this.maxAggRewriteFilters = evaluateFilterRewriteSetting(); this.cardinalityAggregationPruningThreshold = evaluateCardinalityAggregationPruningThreshold(); - this.concurrentSearchDeciders = concurrentSearchDeciders; + this.concurrentSearchDeciderFactories = concurrentSearchDeciderFactories; this.keywordIndexOrDocValuesEnabled = evaluateKeywordIndexOrDocValuesEnabled(); } @@ -932,14 +933,27 @@ public boolean shouldUseConcurrentSearch() { private boolean evaluateAutoMode() { - // filter out deciders that want to opt-out of decision-making - final Set filteredDeciders = concurrentSearchDeciders.stream() - .filter(concurrentSearchDecider -> concurrentSearchDecider.canEvaluateForIndex(indexService.getIndexSettings())) - .collect(Collectors.toSet()); + final Set concurrentSearchRequestDeciders = new HashSet<>(); + + // create the ConcurrentSearchRequestDeciders using registered factories + for (Optional deciderFactory : concurrentSearchDeciderFactories) { + if (deciderFactory != null && deciderFactory.isPresent()) { + final ConcurrentSearchRequestDecider.Factory factory = deciderFactory.get(); + final Optional concurrentSearchRequestDecider = factory.create( + indexService.getIndexSettings() + ); + if (concurrentSearchRequestDecider != null + && concurrentSearchRequestDecider.isPresent() + && concurrentSearchRequestDecider.get() != null) { + concurrentSearchRequestDeciders.add(concurrentSearchRequestDecider.get()); + } + } + } + // evaluate based on concurrent search query visitor - if (filteredDeciders.size() > 0) { + if (concurrentSearchRequestDeciders.size() > 0) { ConcurrentSearchVisitor concurrentSearchVisitor = new ConcurrentSearchVisitor( - filteredDeciders, + concurrentSearchRequestDeciders, indexService.getIndexSettings() ); if (request().source() != null && request().source().query() != null) { @@ -949,7 +963,7 @@ private boolean evaluateAutoMode() { } final List decisions = new ArrayList<>(); - for (ConcurrentSearchDecider decider : filteredDeciders) { + for (ConcurrentSearchRequestDecider decider : concurrentSearchRequestDeciders) { ConcurrentSearchDecision decision = decider.getConcurrentSearchDecision(); if (decision != null) { if (logger.isDebugEnabled()) { diff --git a/server/src/main/java/org/opensearch/search/SearchModule.java b/server/src/main/java/org/opensearch/search/SearchModule.java index c51004a1ea95e..b50165efc7fbb 100644 --- a/server/src/main/java/org/opensearch/search/SearchModule.java +++ b/server/src/main/java/org/opensearch/search/SearchModule.java @@ -239,7 +239,7 @@ import org.opensearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder; import org.opensearch.search.aggregations.pipeline.SumBucketPipelineAggregationBuilder; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; -import org.opensearch.search.deciders.ConcurrentSearchDecider; +import org.opensearch.search.deciders.ConcurrentSearchRequestDecider; import org.opensearch.search.fetch.FetchPhase; import org.opensearch.search.fetch.FetchSubPhase; import org.opensearch.search.fetch.subphase.ExplainPhase; @@ -318,7 +318,7 @@ public class SearchModule { private final QueryPhaseSearcher queryPhaseSearcher; private final SearchPlugin.ExecutorServiceProvider indexSearcherExecutorProvider; - private final Collection concurrentSearchDeciders; + private final Collection> concurrentSearchDeciderFactories; /** * Constructs a new SearchModule object @@ -348,25 +348,25 @@ public SearchModule(Settings settings, List plugins) { queryPhaseSearcher = registerQueryPhaseSearcher(plugins); indexSearcherExecutorProvider = registerIndexSearcherExecutorProvider(plugins); namedWriteables.addAll(SortValue.namedWriteables()); - concurrentSearchDeciders = registerConcurrentSearchDeciders(plugins); + concurrentSearchDeciderFactories = registerConcurrentSearchDeciders(plugins); } - private Collection registerConcurrentSearchDeciders(List plugins) { - List concurrentSearchDeciders = new ArrayList<>(); + private Collection> registerConcurrentSearchDeciders(List plugins) { + List> concurrentSearchDeciders = new ArrayList<>(); for (SearchPlugin plugin : plugins) { - ConcurrentSearchDecider decider = plugin.getConcurrentSearchDecider(); - if (decider != null) { - concurrentSearchDeciders.add(decider); + final Optional deciderFactory = plugin.getConcurrentSearchRequestDeciderFactory(); + if (deciderFactory != null && deciderFactory.isPresent()) { + concurrentSearchDeciders.add(deciderFactory); } } return concurrentSearchDeciders; } /** - * Returns the concurrent search deciders that the plugins have registered + * Returns the concurrent search decider factories that the plugins have registered */ - public Collection getConcurrentSearchDeciders() { - return concurrentSearchDeciders; + public Collection> getConcurrentSearchRequestDeciderFactories() { + return concurrentSearchDeciderFactories; } public List getNamedWriteables() { diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index e2a804a674d8f..f15a9be87049c 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -104,7 +104,7 @@ import org.opensearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.collapse.CollapseContext; -import org.opensearch.search.deciders.ConcurrentSearchDecider; +import org.opensearch.search.deciders.ConcurrentSearchRequestDecider; import org.opensearch.search.dfs.DfsPhase; import org.opensearch.search.dfs.DfsSearchResult; import org.opensearch.search.fetch.FetchPhase; @@ -364,7 +364,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv private final QueryPhase queryPhase; private final FetchPhase fetchPhase; - private final Collection concurrentSearchDeciders; + private final Collection> concurrentSearchDeciderFactories; private volatile long defaultKeepAlive; @@ -410,7 +410,7 @@ public SearchService( CircuitBreakerService circuitBreakerService, Executor indexSearcherExecutor, TaskResourceTrackingService taskResourceTrackingService, - Collection concurrentSearchDeciders + Collection> concurrentSearchDeciderFactories ) { Settings settings = clusterService.getSettings(); this.threadPool = threadPool; @@ -466,7 +466,7 @@ public SearchService( allowDerivedField = CLUSTER_ALLOW_DERIVED_FIELD_SETTING.get(settings); clusterService.getClusterSettings().addSettingsUpdateConsumer(CLUSTER_ALLOW_DERIVED_FIELD_SETTING, this::setAllowDerivedField); - this.concurrentSearchDeciders = concurrentSearchDeciders; + this.concurrentSearchDeciderFactories = concurrentSearchDeciderFactories; } private void validateKeepAlives(TimeValue defaultKeepAlive, TimeValue maxKeepAlive) { @@ -1167,7 +1167,7 @@ private DefaultSearchContext createSearchContext(ReaderContext reader, ShardSear validate, indexSearcherExecutor, this::aggReduceContextBuilder, - concurrentSearchDeciders + concurrentSearchDeciderFactories ); // we clone the query shard context here just for rewriting otherwise we // might end up with incorrect state since we are using now() or script services diff --git a/server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchDecision.java b/server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchDecision.java index 2a30413eff9c8..4ac47221856d1 100644 --- a/server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchDecision.java +++ b/server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchDecision.java @@ -13,7 +13,7 @@ import java.util.Collection; /** - * This Class defines the decisions that a {@link ConcurrentSearchDecider#getConcurrentSearchDecision} can return. + * This Class defines the decisions that a {@link ConcurrentSearchRequestDecider#getConcurrentSearchDecision} can return. * */ @ExperimentalApi diff --git a/server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchDecider.java b/server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchRequestDecider.java similarity index 51% rename from server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchDecider.java rename to server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchRequestDecider.java index 9c588bb45b4ec..3251c318c6e9d 100644 --- a/server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchDecider.java +++ b/server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchRequestDecider.java @@ -12,17 +12,21 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.query.QueryBuilder; +import java.util.Optional; + /** - * {@link ConcurrentSearchDecider} allows pluggable way to evaluate if a query in the search request + * {@link ConcurrentSearchRequestDecider} allows pluggable way to evaluate if a query in the search request * can use concurrent segment search using the passed in queryBuilders from query tree and index settings * on a per shard request basis. - * Implementations can also opt out of the evaluation process for certain indices based on the index settings. - * For all the deciders which can evaluate query tree for an index, its evaluateForQuery method - * will be called for each node in the query tree. After traversing of the query tree is completed, the final - * decision from the deciders will be obtained using {@link ConcurrentSearchDecider#getConcurrentSearchDecision} + * Implementations will need to implement the Factory interface that can be used to create the ConcurrentSearchRequestDecider + * This factory will be called on each shard search request to create the ConcurrentSearchRequestDecider and get the + * concurrent search decision from the created decider on a per-request basis. + * For all the deciders the evaluateForQuery method will be called for each node in the query tree. + * After traversing of the query tree is completed, the final decision from the deciders will be + * obtained using {@link ConcurrentSearchRequestDecider#getConcurrentSearchDecision} */ @ExperimentalApi -public abstract class ConcurrentSearchDecider { +public abstract class ConcurrentSearchRequestDecider { /** * Evaluate for the passed in queryBuilder node in the query tree of the search request @@ -31,14 +35,6 @@ public abstract class ConcurrentSearchDecider { */ public abstract void evaluateForQuery(QueryBuilder queryBuilder, IndexSettings indexSettings); - /** - * Provides a way for deciders to opt out of decision-making process for certain requests based on - * index settings. - * Return true if interested in decision making for index, - * false, otherwise - */ - public abstract boolean canEvaluateForIndex(IndexSettings indexSettings); - /** * Provide the final decision for concurrent search based on all evaluations * Plugins may need to maintain internal state of evaluations to provide a final decision @@ -47,4 +43,15 @@ public abstract class ConcurrentSearchDecider { */ public abstract ConcurrentSearchDecision getConcurrentSearchDecision(); + /** + * Factory interface that can be implemented to create the ConcurrentSearchRequestDecider object. + * Implementations can use the passed in indexSettings to decide whether to create the decider object or not. + */ + @ExperimentalApi + public interface Factory { + default Optional create(IndexSettings indexSettings) { + return Optional.empty(); + } + } + } diff --git a/server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchVisitor.java b/server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchVisitor.java index 12ba1b2a9cc5f..d1a4fa982dc7e 100644 --- a/server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchVisitor.java +++ b/server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchVisitor.java @@ -19,15 +19,15 @@ /** * Class to traverse the QueryBuilder tree and invoke the - * {@link ConcurrentSearchDecider#evaluateForQuery} at each node of the query tree + * {@link ConcurrentSearchRequestDecider#evaluateForQuery} at each node of the query tree */ @ExperimentalApi public class ConcurrentSearchVisitor implements QueryBuilderVisitor { - private final Set deciders; + private final Set deciders; private final IndexSettings indexSettings; - public ConcurrentSearchVisitor(Set concurrentSearchVisitorDeciders, IndexSettings idxSettings) { + public ConcurrentSearchVisitor(Set concurrentSearchVisitorDeciders, IndexSettings idxSettings) { Objects.requireNonNull(concurrentSearchVisitorDeciders, "Concurrent search deciders cannot be null"); deciders = concurrentSearchVisitorDeciders; indexSettings = idxSettings; diff --git a/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java b/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java index 7e213218eb97b..29cdd891e97ca 100644 --- a/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java +++ b/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java @@ -76,8 +76,8 @@ import org.opensearch.search.aggregations.MultiBucketConsumerService; import org.opensearch.search.aggregations.SearchContextAggregations; import org.opensearch.search.builder.SearchSourceBuilder; -import org.opensearch.search.deciders.ConcurrentSearchDecider; import org.opensearch.search.deciders.ConcurrentSearchDecision; +import org.opensearch.search.deciders.ConcurrentSearchRequestDecider; import org.opensearch.search.internal.AliasFilter; import org.opensearch.search.internal.LegacyReaderContext; import org.opensearch.search.internal.PitReaderContext; @@ -96,6 +96,8 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -984,14 +986,27 @@ protected Engine.Searcher acquireSearcherInternal(String source) { // Case4: multiple deciders are registered and all of them opt out of decision-making // with supported agg query so concurrent path is used - ConcurrentSearchDecider decider1 = mock(ConcurrentSearchDecider.class); - when(decider1.canEvaluateForIndex(any())).thenReturn(false); - ConcurrentSearchDecider decider2 = mock(ConcurrentSearchDecider.class); - when(decider2.canEvaluateForIndex(any())).thenReturn(false); + ConcurrentSearchRequestDecider decider1 = mock(ConcurrentSearchRequestDecider.class); - Collection concurrentSearchDeciders = new ArrayList<>(); - concurrentSearchDeciders.add(decider1); - concurrentSearchDeciders.add(decider2); + ConcurrentSearchRequestDecider decider2 = mock(ConcurrentSearchRequestDecider.class); + + ConcurrentSearchRequestDecider.Factory factory1 = new ConcurrentSearchRequestDecider.Factory() { + @Override + public Optional create(IndexSettings indexSettings) { + return Optional.ofNullable(decider1); + } + }; + + Optional factory2 = Optional.of(new ConcurrentSearchRequestDecider.Factory() { + @Override + public Optional create(IndexSettings indexSettings) { + return Optional.ofNullable(decider2); + } + }); + + List> concurrentSearchRequestDeciders = new ArrayList<>(); + concurrentSearchRequestDeciders.add(Optional.of(factory1)); + concurrentSearchRequestDeciders.add(factory2); context = new DefaultSearchContext( readerContext, @@ -1007,7 +1022,7 @@ protected Engine.Searcher acquireSearcherInternal(String source) { false, executor, null, - concurrentSearchDeciders + concurrentSearchRequestDeciders ); // create a supported agg operation context.aggregations(mockAggregations); @@ -1021,15 +1036,9 @@ protected Engine.Searcher acquireSearcherInternal(String source) { // Case5: multiple deciders are registered and one of them returns ConcurrentSearchDecision.DecisionStatus.NO // use non-concurrent path even if query contains supported agg - when(decider1.canEvaluateForIndex(any())).thenReturn(true); when(decider1.getConcurrentSearchDecision()).thenReturn( new ConcurrentSearchDecision(ConcurrentSearchDecision.DecisionStatus.NO, "disable concurrent search") ); - when(decider2.canEvaluateForIndex(any())).thenReturn(false); - - concurrentSearchDeciders.clear(); - concurrentSearchDeciders.add(decider1); - concurrentSearchDeciders.add(decider2); // create a source so that query tree is parsed by visitor SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); @@ -1051,7 +1060,7 @@ protected Engine.Searcher acquireSearcherInternal(String source) { false, executor, null, - concurrentSearchDeciders + concurrentSearchRequestDeciders ); // create a supported agg operation @@ -1067,20 +1076,17 @@ protected Engine.Searcher acquireSearcherInternal(String source) { // Case6: multiple deciders are registered and first decider returns ConcurrentSearchDecision.DecisionStatus.YES // while second decider returns ConcurrentSearchDecision.DecisionStatus.NO // use non-concurrent path even if query contains supported agg - when(decider1.canEvaluateForIndex(any())).thenReturn(true); + when(decider1.getConcurrentSearchDecision()).thenReturn( new ConcurrentSearchDecision(ConcurrentSearchDecision.DecisionStatus.YES, "enable concurrent search") ); - when(decider2.canEvaluateForIndex(any())).thenReturn(true); + when(decider2.getConcurrentSearchDecision()).thenReturn( new ConcurrentSearchDecision(ConcurrentSearchDecision.DecisionStatus.NO, "disable concurrent search") ); - concurrentSearchDeciders.clear(); - concurrentSearchDeciders.add(decider1); - concurrentSearchDeciders.add(decider2); - // create a source so that query tree is parsed by visitor + when(shardSearchRequest.source()).thenReturn(sourceBuilder); context = new DefaultSearchContext( readerContext, @@ -1096,7 +1102,7 @@ protected Engine.Searcher acquireSearcherInternal(String source) { false, executor, null, - concurrentSearchDeciders + concurrentSearchRequestDeciders ); // create a supported agg operation @@ -1111,22 +1117,19 @@ protected Engine.Searcher acquireSearcherInternal(String source) { // Case7: multiple deciders are registered and all return ConcurrentSearchDecision.DecisionStatus.NO_OP // but un-supported agg query is present, use non-concurrent path - when(decider1.canEvaluateForIndex(any())).thenReturn(true); + when(decider1.getConcurrentSearchDecision()).thenReturn( new ConcurrentSearchDecision(ConcurrentSearchDecision.DecisionStatus.NO_OP, "noop") ); - when(decider2.canEvaluateForIndex(any())).thenReturn(true); + when(decider2.getConcurrentSearchDecision()).thenReturn( new ConcurrentSearchDecision(ConcurrentSearchDecision.DecisionStatus.NO_OP, "noop") ); when(mockAggregations.factories().allFactoriesSupportConcurrentSearch()).thenReturn(false); - concurrentSearchDeciders.clear(); - concurrentSearchDeciders.add(decider1); - concurrentSearchDeciders.add(decider2); - // create a source so that query tree is parsed by visitor + when(shardSearchRequest.source()).thenReturn(sourceBuilder); context = new DefaultSearchContext( readerContext, @@ -1142,7 +1145,7 @@ protected Engine.Searcher acquireSearcherInternal(String source) { false, executor, null, - concurrentSearchDeciders + concurrentSearchRequestDeciders ); // create a supported agg operation diff --git a/server/src/test/java/org/opensearch/search/SearchModuleTests.java b/server/src/test/java/org/opensearch/search/SearchModuleTests.java index 71bbe7c718f47..357ed3bc96e62 100644 --- a/server/src/test/java/org/opensearch/search/SearchModuleTests.java +++ b/server/src/test/java/org/opensearch/search/SearchModuleTests.java @@ -41,6 +41,7 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.index.IndexSettings; import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryRewriteContext; import org.opensearch.index.query.QueryShardContext; @@ -69,7 +70,7 @@ import org.opensearch.search.aggregations.support.ValuesSourceConfig; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; import org.opensearch.search.aggregations.support.ValuesSourceType; -import org.opensearch.search.deciders.ConcurrentSearchDecider; +import org.opensearch.search.deciders.ConcurrentSearchRequestDecider; import org.opensearch.search.fetch.FetchSubPhase; import org.opensearch.search.fetch.subphase.ExplainPhase; import org.opensearch.search.fetch.subphase.highlight.CustomHighlighter; @@ -501,12 +502,12 @@ public Optional getIndexSearcherExecutorProvider() { expectThrows(IllegalStateException.class, () -> new SearchModule(Settings.EMPTY, searchPlugins)); } - public void testRegisterConcurrentSearchDecidersNoExternalPlugins() { + public void testRegisterConcurrentSearchRequestDecidersNoExternalPlugins() { SearchModule searchModule = new SearchModule(Settings.EMPTY, Collections.emptyList()); - assertEquals(searchModule.getConcurrentSearchDeciders().size(), 0); + assertEquals(searchModule.getConcurrentSearchRequestDeciderFactories().size(), 0); } - public void testRegisterConcurrentSearchDecidersExternalPluginsWithNoDeciders() { + public void testRegisterConcurrentSearchRequestDecidersExternalPluginsWithNoDeciders() { SearchPlugin plugin1 = new SearchPlugin() { @Override public Optional getIndexSearcherExecutorProvider() { @@ -521,10 +522,10 @@ public Optional getIndexSearcherExecutorProvider() { searchPlugins.add(plugin2); SearchModule searchModule = new SearchModule(Settings.EMPTY, searchPlugins); - assertEquals(searchModule.getConcurrentSearchDeciders().size(), 0); + assertEquals(searchModule.getConcurrentSearchRequestDeciderFactories().size(), 0); } - public void testRegisterConcurrentSearchDecidersExternalPluginsWithDeciders() { + public void testRegisterConcurrentSearchRequestDecidersExternalPluginsWithDeciders() { SearchPlugin pluginDecider1 = new SearchPlugin() { @Override public Optional getIndexSearcherExecutorProvider() { @@ -532,15 +533,25 @@ public Optional getIndexSearcherExecutorProvider() { } @Override - public ConcurrentSearchDecider getConcurrentSearchDecider() { - return mock(ConcurrentSearchDecider.class); + public Optional getConcurrentSearchRequestDeciderFactory() { + return Optional.of(new ConcurrentSearchRequestDecider.Factory() { + @Override + public Optional create(IndexSettings indexSettings) { + return Optional.of(mock(ConcurrentSearchRequestDecider.class)); + } + }); } }; SearchPlugin pluginDecider2 = new SearchPlugin() { @Override - public ConcurrentSearchDecider getConcurrentSearchDecider() { - return mock(ConcurrentSearchDecider.class); + public Optional getConcurrentSearchRequestDeciderFactory() { + return Optional.of(new ConcurrentSearchRequestDecider.Factory() { + @Override + public Optional create(IndexSettings indexSettings) { + return Optional.of(mock(ConcurrentSearchRequestDecider.class)); + } + }); } }; @@ -549,13 +560,13 @@ public ConcurrentSearchDecider getConcurrentSearchDecider() { searchPlugins.add(pluginDecider2); SearchModule searchModule = new SearchModule(Settings.EMPTY, searchPlugins); - assertEquals(searchModule.getConcurrentSearchDeciders().size(), 2); + assertEquals(searchModule.getConcurrentSearchRequestDeciderFactories().size(), 2); } - public void testRegisterConcurrentSearchDecidersPluginWithNullDecider() { + public void testRegisterConcurrentSearchRequestDecidersPluginWithNullDecider() { SearchPlugin pluginWithNullDecider = new SearchPlugin() { @Override - public ConcurrentSearchDecider getConcurrentSearchDecider() { + public Optional getConcurrentSearchRequestDeciderFactory() { return null; } }; @@ -564,7 +575,7 @@ public ConcurrentSearchDecider getConcurrentSearchDecider() { searchPlugins.add(pluginWithNullDecider); SearchModule searchModule = new SearchModule(Settings.EMPTY, searchPlugins); // null decider is filtered out, so 0 deciders - assertEquals(searchModule.getConcurrentSearchDeciders().size(), 0); + assertEquals(searchModule.getConcurrentSearchRequestDeciderFactories().size(), 0); } diff --git a/test/framework/src/main/java/org/opensearch/node/MockNode.java b/test/framework/src/main/java/org/opensearch/node/MockNode.java index 09df9b85320f0..7e81630d14383 100644 --- a/test/framework/src/main/java/org/opensearch/node/MockNode.java +++ b/test/framework/src/main/java/org/opensearch/node/MockNode.java @@ -57,7 +57,7 @@ import org.opensearch.script.ScriptService; import org.opensearch.search.MockSearchService; import org.opensearch.search.SearchService; -import org.opensearch.search.deciders.ConcurrentSearchDecider; +import org.opensearch.search.deciders.ConcurrentSearchRequestDecider; import org.opensearch.search.fetch.FetchPhase; import org.opensearch.search.query.QueryPhase; import org.opensearch.tasks.TaskResourceTrackingService; @@ -74,6 +74,7 @@ import java.util.Collections; import java.util.Map; import java.util.Set; +import java.util.Optional; import java.util.concurrent.Executor; import java.util.function.Function; @@ -158,7 +159,7 @@ protected SearchService newSearchService( CircuitBreakerService circuitBreakerService, Executor indexSearcherExecutor, TaskResourceTrackingService taskResourceTrackingService, - Collection concurrentSearchDecidersList + Collection> concurrentSearchDeciderFactories ) { if (getPluginsService().filterPlugins(MockSearchService.TestPlugin.class).isEmpty()) { return super.newSearchService( @@ -173,7 +174,7 @@ protected SearchService newSearchService( circuitBreakerService, indexSearcherExecutor, taskResourceTrackingService, - concurrentSearchDecidersList + concurrentSearchDeciderFactories ); } return new MockSearchService(