-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial search pipelines implementation #6587
Merged
Merged
Changes from 13 commits
Commits
Show all changes
18 commits
Select commit
Hold shift + click to select a range
0299907
Initial search pipelines implementation
msfroh 096a83a
Merge branch 'main' into search_pipelines
msfroh 425120b
Incorporate feedback from @reta and @navneet1v
msfroh bb1da6b
Register SearchPipelineProcessingException
msfroh 6f020ed
Remove unneeded dependencies from search-pipeline-common
msfroh 84c6d04
Merge branch 'main' into search_pipelines
msfroh 03eef02
Avoid cloning SearchRequest if no SearchRequestProcessors
msfroh 2019fdf
Use NamedWritableRegistry to deserialize SearchRequest
msfroh 7e32d6a
Check for empty pipeline with CollectionUtils.isEmpty
msfroh ba99f9c
Update server/src/main/java/org/opensearch/search/pipeline/SearchPipe…
msfroh ad4df47
Merge branch 'opensearch-project:main' into search_pipelines
msfroh 1379045
Incorporate feedback from @noCharger
msfroh 0a2203a
Incorporate feedback from @reta
msfroh 13a0419
Gate search pipelines behind a feature flag
msfroh 8e1ba27
More feature flag fixes for search pipeline testing
msfroh 179e4aa
Merge branch 'main' into search_pipelines
msfroh f49542c
Move feature flag into constructor parameter
msfroh 402e8b6
Move REST handlers behind feature flag
msfroh File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
149 changes: 149 additions & 0 deletions
149
client/rest-high-level/src/main/java/org/opensearch/client/SearchPipelineClient.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,149 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.client; | ||
|
||
import org.opensearch.action.ActionListener; | ||
import org.opensearch.action.search.DeleteSearchPipelineRequest; | ||
import org.opensearch.action.search.GetSearchPipelineRequest; | ||
import org.opensearch.action.search.GetSearchPipelineResponse; | ||
import org.opensearch.action.search.PutSearchPipelineRequest; | ||
import org.opensearch.action.support.master.AcknowledgedResponse; | ||
|
||
import java.io.IOException; | ||
import java.util.Collections; | ||
|
||
import static java.util.Collections.emptySet; | ||
|
||
public final class SearchPipelineClient { | ||
private final RestHighLevelClient restHighLevelClient; | ||
|
||
SearchPipelineClient(RestHighLevelClient restHighLevelClient) { | ||
this.restHighLevelClient = restHighLevelClient; | ||
} | ||
|
||
/** | ||
* Add a pipeline or update an existing pipeline. | ||
* | ||
* @param request the request | ||
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized | ||
* @return the response | ||
* @throws IOException in case there is a problem sending the request or parsing back the response | ||
*/ | ||
public AcknowledgedResponse put(PutSearchPipelineRequest request, RequestOptions options) throws IOException { | ||
return restHighLevelClient.performRequestAndParseEntity( | ||
request, | ||
SearchPipelinesRequestConverters::putPipeline, | ||
options, | ||
AcknowledgedResponse::fromXContent, | ||
emptySet() | ||
); | ||
} | ||
|
||
/** | ||
* Asynchronously add a pipeline or update an existing pipeline. | ||
* | ||
* @param request the request | ||
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized | ||
* @param listener the listener to be notified upon request completion | ||
* @return cancellable that may be used to cancel the request | ||
*/ | ||
public Cancellable putAsync(PutSearchPipelineRequest request, RequestOptions options, ActionListener<AcknowledgedResponse> listener) { | ||
return restHighLevelClient.performRequestAsyncAndParseEntity( | ||
request, | ||
SearchPipelinesRequestConverters::putPipeline, | ||
options, | ||
AcknowledgedResponse::fromXContent, | ||
listener, | ||
emptySet() | ||
); | ||
} | ||
|
||
/** | ||
* Get existing pipelines. | ||
* | ||
* @param request the request | ||
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized | ||
* @return the response | ||
* @throws IOException in case there is a problem sending the request or parsing back the response | ||
*/ | ||
public GetSearchPipelineResponse get(GetSearchPipelineRequest request, RequestOptions options) throws IOException { | ||
return restHighLevelClient.performRequestAndParseEntity( | ||
request, | ||
SearchPipelinesRequestConverters::getPipeline, | ||
options, | ||
GetSearchPipelineResponse::fromXContent, | ||
Collections.singleton(404) | ||
); | ||
} | ||
|
||
/** | ||
* Asynchronously get existing pipelines. | ||
* | ||
* @param request the request | ||
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized | ||
* @param listener the listener to be notified upon request completion | ||
* @return cancellable that may be used to cancel the request | ||
*/ | ||
public Cancellable getAsync( | ||
GetSearchPipelineRequest request, | ||
RequestOptions options, | ||
ActionListener<GetSearchPipelineResponse> listener | ||
) { | ||
return restHighLevelClient.performRequestAsyncAndParseEntity( | ||
request, | ||
SearchPipelinesRequestConverters::getPipeline, | ||
options, | ||
GetSearchPipelineResponse::fromXContent, | ||
listener, | ||
Collections.singleton(404) | ||
); | ||
} | ||
|
||
/** | ||
* Delete an existing pipeline. | ||
* | ||
* @param request the request | ||
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized | ||
* @return the response | ||
* @throws IOException in case there is a problem sending the request or parsing back the response | ||
*/ | ||
public AcknowledgedResponse delete(DeleteSearchPipelineRequest request, RequestOptions options) throws IOException { | ||
return restHighLevelClient.performRequestAndParseEntity( | ||
request, | ||
SearchPipelinesRequestConverters::deletePipeline, | ||
options, | ||
AcknowledgedResponse::fromXContent, | ||
emptySet() | ||
); | ||
} | ||
|
||
/** | ||
* Asynchronously delete an existing pipeline. | ||
* | ||
* @param request the request | ||
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized | ||
* @param listener the listener to be notified upon request completion | ||
* @return cancellable that may be used to cancel the request | ||
*/ | ||
public Cancellable deleteAsync( | ||
DeleteSearchPipelineRequest request, | ||
RequestOptions options, | ||
ActionListener<AcknowledgedResponse> listener | ||
) { | ||
return restHighLevelClient.performRequestAsyncAndParseEntity( | ||
request, | ||
SearchPipelinesRequestConverters::deletePipeline, | ||
options, | ||
AcknowledgedResponse::fromXContent, | ||
listener, | ||
emptySet() | ||
); | ||
} | ||
|
||
} |
61 changes: 61 additions & 0 deletions
61
...rest-high-level/src/main/java/org/opensearch/client/SearchPipelinesRequestConverters.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.client; | ||
|
||
import org.apache.hc.client5.http.classic.methods.HttpDelete; | ||
import org.apache.hc.client5.http.classic.methods.HttpGet; | ||
import org.apache.hc.client5.http.classic.methods.HttpPut; | ||
import org.opensearch.action.search.DeleteSearchPipelineRequest; | ||
import org.opensearch.action.search.GetSearchPipelineRequest; | ||
import org.opensearch.action.search.PutSearchPipelineRequest; | ||
|
||
import java.io.IOException; | ||
|
||
final class SearchPipelinesRequestConverters { | ||
private SearchPipelinesRequestConverters() {} | ||
|
||
static Request putPipeline(PutSearchPipelineRequest putPipelineRequest) throws IOException { | ||
String endpoint = new RequestConverters.EndpointBuilder().addPathPartAsIs("_search/pipeline") | ||
.addPathPart(putPipelineRequest.getId()) | ||
.build(); | ||
Request request = new Request(HttpPut.METHOD_NAME, endpoint); | ||
|
||
RequestConverters.Params params = new RequestConverters.Params(); | ||
params.withTimeout(putPipelineRequest.timeout()); | ||
params.withClusterManagerTimeout(putPipelineRequest.clusterManagerNodeTimeout()); | ||
request.addParameters(params.asMap()); | ||
request.setEntity(RequestConverters.createEntity(putPipelineRequest, RequestConverters.REQUEST_BODY_CONTENT_TYPE)); | ||
return request; | ||
} | ||
|
||
static Request deletePipeline(DeleteSearchPipelineRequest deletePipelineRequest) { | ||
String endpoint = new RequestConverters.EndpointBuilder().addPathPartAsIs("_search/pipeline") | ||
.addPathPart(deletePipelineRequest.getId()) | ||
.build(); | ||
Request request = new Request(HttpDelete.METHOD_NAME, endpoint); | ||
|
||
RequestConverters.Params parameters = new RequestConverters.Params(); | ||
parameters.withTimeout(deletePipelineRequest.timeout()); | ||
parameters.withClusterManagerTimeout(deletePipelineRequest.clusterManagerNodeTimeout()); | ||
request.addParameters(parameters.asMap()); | ||
return request; | ||
} | ||
|
||
static Request getPipeline(GetSearchPipelineRequest getPipelineRequest) { | ||
String endpoint = new RequestConverters.EndpointBuilder().addPathPartAsIs("_search/pipeline") | ||
.addCommaSeparatedPathParts(getPipelineRequest.getIds()) | ||
.build(); | ||
Request request = new Request(HttpGet.METHOD_NAME, endpoint); | ||
|
||
RequestConverters.Params parameters = new RequestConverters.Params(); | ||
parameters.withClusterManagerTimeout(getPipelineRequest.clusterManagerNodeTimeout()); | ||
request.addParameters(parameters.asMap()); | ||
return request; | ||
} | ||
} |
115 changes: 115 additions & 0 deletions
115
client/rest-high-level/src/test/java/org/opensearch/client/SearchPipelineClientIT.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.client; | ||
|
||
import org.opensearch.action.search.DeleteSearchPipelineRequest; | ||
import org.opensearch.action.search.GetSearchPipelineRequest; | ||
import org.opensearch.action.search.GetSearchPipelineResponse; | ||
import org.opensearch.action.search.PutSearchPipelineRequest; | ||
import org.opensearch.action.support.master.AcknowledgedResponse; | ||
import org.opensearch.common.bytes.BytesReference; | ||
import org.opensearch.common.xcontent.XContentType; | ||
import org.opensearch.core.xcontent.XContentBuilder; | ||
|
||
import java.io.IOException; | ||
|
||
public class SearchPipelineClientIT extends OpenSearchRestHighLevelClientTestCase { | ||
|
||
public void testPutPipeline() throws IOException { | ||
String id = "some_pipeline_id"; | ||
XContentBuilder pipelineBuilder = buildSearchPipeline(); | ||
PutSearchPipelineRequest request = new PutSearchPipelineRequest( | ||
id, | ||
BytesReference.bytes(pipelineBuilder), | ||
pipelineBuilder.contentType() | ||
); | ||
createPipeline(request); | ||
} | ||
|
||
private static void createPipeline(PutSearchPipelineRequest request) throws IOException { | ||
AcknowledgedResponse response = execute( | ||
request, | ||
highLevelClient().searchPipeline()::put, | ||
highLevelClient().searchPipeline()::putAsync | ||
); | ||
assertTrue(response.isAcknowledged()); | ||
} | ||
|
||
public void testGetPipeline() throws IOException { | ||
String id = "some_pipeline_id"; | ||
XContentBuilder pipelineBuilder = buildSearchPipeline(); | ||
PutSearchPipelineRequest request = new PutSearchPipelineRequest( | ||
id, | ||
BytesReference.bytes(pipelineBuilder), | ||
pipelineBuilder.contentType() | ||
); | ||
createPipeline(request); | ||
|
||
GetSearchPipelineRequest getRequest = new GetSearchPipelineRequest(id); | ||
GetSearchPipelineResponse response = execute( | ||
getRequest, | ||
highLevelClient().searchPipeline()::get, | ||
highLevelClient().searchPipeline()::getAsync | ||
); | ||
assertTrue(response.isFound()); | ||
assertEquals(1, response.pipelines().size()); | ||
assertEquals(id, response.pipelines().get(0).getId()); | ||
} | ||
|
||
public void testDeletePipeline() throws IOException { | ||
String id = "some_pipeline_id"; | ||
XContentBuilder pipelineBuilder = buildSearchPipeline(); | ||
PutSearchPipelineRequest request = new PutSearchPipelineRequest( | ||
id, | ||
BytesReference.bytes(pipelineBuilder), | ||
pipelineBuilder.contentType() | ||
); | ||
createPipeline(request); | ||
|
||
DeleteSearchPipelineRequest deleteRequest = new DeleteSearchPipelineRequest(id); | ||
AcknowledgedResponse response = execute( | ||
deleteRequest, | ||
highLevelClient().searchPipeline()::delete, | ||
highLevelClient().searchPipeline()::deleteAsync | ||
); | ||
assertTrue(response.isAcknowledged()); | ||
} | ||
|
||
private static XContentBuilder buildSearchPipeline() throws IOException { | ||
XContentType xContentType = randomFrom(XContentType.values()); | ||
XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent()); | ||
return buildSearchPipeline(pipelineBuilder); | ||
} | ||
|
||
private static XContentBuilder buildSearchPipeline(XContentBuilder builder) throws IOException { | ||
builder.startObject(); | ||
{ | ||
builder.field("description", "a pipeline description"); | ||
builder.startArray("request_processors"); | ||
{ | ||
builder.startObject().startObject("filter_query"); | ||
{ | ||
builder.startObject("query"); | ||
{ | ||
builder.startObject("term"); | ||
{ | ||
builder.field("field", "value"); | ||
} | ||
builder.endObject(); | ||
} | ||
builder.endObject(); | ||
} | ||
builder.endObject().endObject(); | ||
} | ||
builder.endArray(); | ||
} | ||
builder.endObject(); | ||
return builder; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
* | ||
* Modifications Copyright OpenSearch Contributors. See | ||
* GitHub history for details. | ||
*/ | ||
|
||
apply plugin: 'opensearch.yaml-rest-test' | ||
apply plugin: 'opensearch.internal-cluster-test' | ||
|
||
opensearchplugin { | ||
description 'Module for search pipeline processors that do not require additional security permissions or have large dependencies and resources' | ||
classname 'org.opensearch.search.pipeline.common.SearchPipelineCommonModulePlugin' | ||
} | ||
|
||
dependencies { | ||
} | ||
|
||
restResources { | ||
restApi { | ||
includeCore '_common', 'search_pipeline', 'cluster', 'indices', 'index', 'nodes', 'search' | ||
} | ||
} | ||
|
||
testClusters.all { | ||
} | ||
|
||
thirdPartyAudit.ignoreMissingClasses( | ||
// from log4j | ||
'org.osgi.framework.AdaptPermission', | ||
'org.osgi.framework.AdminPermission', | ||
'org.osgi.framework.Bundle', | ||
'org.osgi.framework.BundleActivator', | ||
'org.osgi.framework.BundleContext', | ||
'org.osgi.framework.BundleEvent', | ||
'org.osgi.framework.SynchronousBundleListener', | ||
'org.osgi.framework.wiring.BundleWire', | ||
'org.osgi.framework.wiring.BundleWiring' | ||
) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?