Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial search pipelines implementation #6587

Merged
merged 18 commits into from
Apr 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@

import static java.util.stream.Collectors.toList;
import static org.opensearch.common.util.FeatureFlags.REPLICATION_TYPE;
import static org.opensearch.common.util.FeatureFlags.SEARCH_PIPELINE;
import static org.opensearch.env.NodeEnvironment.collectFileCacheDataPath;
import static org.opensearch.index.ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED_ATTRIBUTE_KEY;

Expand Down Expand Up @@ -979,7 +980,8 @@ protected Node(
xContentRegistry,
namedWriteableRegistry,
pluginsService.filterPlugins(SearchPipelinePlugin.class),
client
client,
FeatureFlags.isEnabled(SEARCH_PIPELINE)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

);
this.nodeService = new NodeService(
settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.opensearch.common.regex.Regex;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.CollectionUtils;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
Expand Down Expand Up @@ -79,7 +78,7 @@ public class SearchPipelineService implements ClusterStateApplier, ReportingServ
private final NamedWriteableRegistry namedWriteableRegistry;
private volatile ClusterState state;

private boolean forceEnabled = false;
private final boolean isEnabled;

public SearchPipelineService(
ClusterService clusterService,
Expand All @@ -90,7 +89,8 @@ public SearchPipelineService(
NamedXContentRegistry namedXContentRegistry,
NamedWriteableRegistry namedWriteableRegistry,
List<SearchPipelinePlugin> searchPipelinePlugins,
Client client
Client client,
boolean isEnabled
) {
this.clusterService = clusterService;
this.scriptService = scriptService;
Expand All @@ -113,6 +113,7 @@ public SearchPipelineService(
);
putPipelineTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.PUT_SEARCH_PIPELINE_KEY, true);
deletePipelineTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_SEARCH_PIPELINE_KEY, true);
this.isEnabled = isEnabled;
}

private static Map<String, Processor.Factory> processorFactories(
Expand Down Expand Up @@ -214,7 +215,7 @@ public void putPipeline(
PutSearchPipelineRequest request,
ActionListener<AcknowledgedResponse> listener
) throws Exception {
if (isFeatureEnabled() == false) {
if (isEnabled == false) {
throw new IllegalArgumentException("Experimental search pipeline feature is not enabled");
}

Expand Down Expand Up @@ -332,7 +333,7 @@ static ClusterState innerDelete(DeleteSearchPipelineRequest request, ClusterStat

public SearchRequest transformRequest(SearchRequest originalRequest) {
String pipelineId = originalRequest.pipeline();
if (pipelineId != null && isFeatureEnabled()) {
if (pipelineId != null && isEnabled) {
PipelineHolder pipeline = pipelines.get(pipelineId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking nit - if we want to keep the name of PipelineHolder , rename pipeline to pipelineholder to avoid invoke pipeline.pipeline

if (pipeline == null) {
throw new IllegalArgumentException("Pipeline " + pipelineId + " is not defined");
Expand All @@ -355,7 +356,7 @@ public SearchRequest transformRequest(SearchRequest originalRequest) {

public SearchResponse transformResponse(SearchRequest request, SearchResponse searchResponse) {
String pipelineId = request.pipeline();
if (pipelineId != null && isFeatureEnabled()) {
if (pipelineId != null && isEnabled) {
PipelineHolder pipeline = pipelines.get(pipelineId);
if (pipeline == null) {
throw new IllegalArgumentException("Pipeline " + pipelineId + " is not defined");
Expand Down Expand Up @@ -428,12 +429,4 @@ static class PipelineHolder {
this.pipeline = Objects.requireNonNull(pipeline);
}
}

private boolean isFeatureEnabled() {
return forceEnabled || FeatureFlags.isEnabled(FeatureFlags.SEARCH_PIPELINE);
}

void setForceEnabled(boolean forceEnabled) {
this.forceEnabled = forceEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ public void testSearchPipelinePlugin() {
this.xContentRegistry(),
this.writableRegistry(),
List.of(DUMMY_PLUGIN),
client
client,
false
);
Map<String, Processor.Factory> factories = searchPipelineService.getProcessorFactories();
assertEquals(1, factories.size());
Expand All @@ -104,7 +105,8 @@ public void testSearchPipelinePluginDuplicate() {
this.xContentRegistry(),
this.writableRegistry(),
List.of(DUMMY_PLUGIN, DUMMY_PLUGIN),
client
client,
false
)
);
assertTrue(e.getMessage(), e.getMessage().contains(" already registered"));
Expand All @@ -121,9 +123,9 @@ public void testExecuteSearchPipelineDoesNotExist() {
this.xContentRegistry(),
this.writableRegistry(),
List.of(DUMMY_PLUGIN),
client
client,
true
);
searchPipelineService.setForceEnabled(true);
final SearchRequest searchRequest = new SearchRequest("_index").pipeline("bar");
IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
Expand Down Expand Up @@ -233,9 +235,9 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
return processors;
}
}),
client
client,
true
);
searchPipelineService.setForceEnabled(true);
return searchPipelineService;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2101,7 +2101,8 @@ public void onFailure(final Exception e) {
namedXContentRegistry,
namedWriteableRegistry,
List.of(),
client
client,
false
)
)
);
Expand Down