-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial search pipelines implementation #6587
Changes from 2 commits
0299907
096a83a
425120b
bb1da6b
6f020ed
84c6d04
03eef02
2019fdf
7e32d6a
ba99f9c
ad4df47
1379045
0a2203a
13a0419
8e1ba27
179e4aa
f49542c
402e8b6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,153 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.client; | ||
|
||
import org.opensearch.action.ActionListener; | ||
import org.opensearch.action.search.DeleteSearchPipelineRequest; | ||
import org.opensearch.action.search.GetSearchPipelineRequest; | ||
import org.opensearch.action.search.GetSearchPipelineResponse; | ||
import org.opensearch.action.search.PutSearchPipelineRequest; | ||
import org.opensearch.action.support.master.AcknowledgedResponse; | ||
|
||
import java.io.IOException; | ||
import java.util.Collections; | ||
|
||
import static java.util.Collections.emptySet; | ||
|
||
public final class SearchPipelinesClient { | ||
private final RestHighLevelClient restHighLevelClient; | ||
|
||
SearchPipelinesClient(RestHighLevelClient restHighLevelClient) { | ||
this.restHighLevelClient = restHighLevelClient; | ||
} | ||
|
||
/** | ||
* Add a pipeline or update an existing pipeline. | ||
* | ||
* @param request the request | ||
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized | ||
* @return the response | ||
* @throws IOException in case there is a problem sending the request or parsing back the response | ||
*/ | ||
public AcknowledgedResponse putPipeline(PutSearchPipelineRequest request, RequestOptions options) throws IOException { | ||
return restHighLevelClient.performRequestAndParseEntity( | ||
request, | ||
SearchPipelinesRequestConverters::putPipeline, | ||
options, | ||
AcknowledgedResponse::fromXContent, | ||
emptySet() | ||
); | ||
} | ||
|
||
/** | ||
* Asynchronously add a pipeline or update an existing pipeline. | ||
* | ||
* @param request the request | ||
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized | ||
* @param listener the listener to be notified upon request completion | ||
* @return cancellable that may be used to cancel the request | ||
*/ | ||
public Cancellable putPipelineAsync( | ||
PutSearchPipelineRequest request, | ||
RequestOptions options, | ||
ActionListener<AcknowledgedResponse> listener | ||
) { | ||
return restHighLevelClient.performRequestAsyncAndParseEntity( | ||
request, | ||
SearchPipelinesRequestConverters::putPipeline, | ||
options, | ||
AcknowledgedResponse::fromXContent, | ||
listener, | ||
emptySet() | ||
); | ||
} | ||
|
||
/** | ||
* Get an existing pipeline. | ||
msfroh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* | ||
* @param request the request | ||
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized | ||
* @return the response | ||
* @throws IOException in case there is a problem sending the request or parsing back the response | ||
*/ | ||
public GetSearchPipelineResponse getPipeline(GetSearchPipelineRequest request, RequestOptions options) throws IOException { | ||
return restHighLevelClient.performRequestAndParseEntity( | ||
request, | ||
SearchPipelinesRequestConverters::getPipeline, | ||
options, | ||
GetSearchPipelineResponse::fromXContent, | ||
Collections.singleton(404) | ||
); | ||
} | ||
|
||
/** | ||
* Asynchronously get an existing pipeline. | ||
* | ||
* @param request the request | ||
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized | ||
* @param listener the listener to be notified upon request completion | ||
* @return cancellable that may be used to cancel the request | ||
*/ | ||
public Cancellable getPipelineAsync( | ||
GetSearchPipelineRequest request, | ||
RequestOptions options, | ||
ActionListener<GetSearchPipelineResponse> listener | ||
) { | ||
return restHighLevelClient.performRequestAsyncAndParseEntity( | ||
request, | ||
SearchPipelinesRequestConverters::getPipeline, | ||
options, | ||
GetSearchPipelineResponse::fromXContent, | ||
listener, | ||
Collections.singleton(404) | ||
); | ||
} | ||
|
||
/** | ||
* Delete an existing pipeline. | ||
* | ||
* @param request the request | ||
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized | ||
* @return the response | ||
* @throws IOException in case there is a problem sending the request or parsing back the response | ||
*/ | ||
public AcknowledgedResponse deletePipeline(DeleteSearchPipelineRequest request, RequestOptions options) throws IOException { | ||
return restHighLevelClient.performRequestAndParseEntity( | ||
request, | ||
SearchPipelinesRequestConverters::deletePipeline, | ||
options, | ||
AcknowledgedResponse::fromXContent, | ||
emptySet() | ||
); | ||
} | ||
|
||
/** | ||
* Asynchronously delete an existing pipeline. | ||
* | ||
* @param request the request | ||
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized | ||
* @param listener the listener to be notified upon request completion | ||
* @return cancellable that may be used to cancel the request | ||
*/ | ||
public Cancellable deletePipelineAsync( | ||
DeleteSearchPipelineRequest request, | ||
RequestOptions options, | ||
ActionListener<AcknowledgedResponse> listener | ||
) { | ||
return restHighLevelClient.performRequestAsyncAndParseEntity( | ||
request, | ||
SearchPipelinesRequestConverters::deletePipeline, | ||
options, | ||
AcknowledgedResponse::fromXContent, | ||
listener, | ||
emptySet() | ||
); | ||
} | ||
|
||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have plan to support There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since a search is inherently read-only, it doesn't need a dedicated What I would like to borrow from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One potential use case for |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,61 @@ | ||||||
/* | ||||||
* SPDX-License-Identifier: Apache-2.0 | ||||||
* | ||||||
* The OpenSearch Contributors require contributions made to | ||||||
* this file be licensed under the Apache-2.0 license or a | ||||||
* compatible open source license. | ||||||
*/ | ||||||
|
||||||
package org.opensearch.client; | ||||||
|
||||||
import org.apache.hc.client5.http.classic.methods.HttpDelete; | ||||||
import org.apache.hc.client5.http.classic.methods.HttpGet; | ||||||
import org.apache.hc.client5.http.classic.methods.HttpPut; | ||||||
import org.opensearch.action.search.DeleteSearchPipelineRequest; | ||||||
import org.opensearch.action.search.GetSearchPipelineRequest; | ||||||
import org.opensearch.action.search.PutSearchPipelineRequest; | ||||||
|
||||||
import java.io.IOException; | ||||||
|
||||||
final class SearchPipelinesRequestConverters { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
? |
||||||
private SearchPipelinesRequestConverters() {} | ||||||
|
||||||
static Request putPipeline(PutSearchPipelineRequest putPipelineRequest) throws IOException { | ||||||
String endpoint = new RequestConverters.EndpointBuilder().addPathPartAsIs("_search/pipeline") | ||||||
.addPathPart(putPipelineRequest.getId()) | ||||||
.build(); | ||||||
Request request = new Request(HttpPut.METHOD_NAME, endpoint); | ||||||
|
||||||
RequestConverters.Params params = new RequestConverters.Params(); | ||||||
params.withTimeout(putPipelineRequest.timeout()); | ||||||
params.withClusterManagerTimeout(putPipelineRequest.clusterManagerNodeTimeout()); | ||||||
request.addParameters(params.asMap()); | ||||||
request.setEntity(RequestConverters.createEntity(putPipelineRequest, RequestConverters.REQUEST_BODY_CONTENT_TYPE)); | ||||||
return request; | ||||||
} | ||||||
|
||||||
static Request deletePipeline(DeleteSearchPipelineRequest deletePipelineRequest) { | ||||||
String endpoint = new RequestConverters.EndpointBuilder().addPathPartAsIs("_search/pipeline") | ||||||
.addPathPart(deletePipelineRequest.getId()) | ||||||
.build(); | ||||||
Request request = new Request(HttpDelete.METHOD_NAME, endpoint); | ||||||
|
||||||
RequestConverters.Params parameters = new RequestConverters.Params(); | ||||||
parameters.withTimeout(deletePipelineRequest.timeout()); | ||||||
parameters.withClusterManagerTimeout(deletePipelineRequest.clusterManagerNodeTimeout()); | ||||||
request.addParameters(parameters.asMap()); | ||||||
return request; | ||||||
} | ||||||
|
||||||
static Request getPipeline(GetSearchPipelineRequest getPipelineRequest) { | ||||||
String endpoint = new RequestConverters.EndpointBuilder().addPathPartAsIs("_search/pipeline") | ||||||
.addCommaSeparatedPathParts(getPipelineRequest.getIds()) | ||||||
.build(); | ||||||
Request request = new Request(HttpGet.METHOD_NAME, endpoint); | ||||||
|
||||||
RequestConverters.Params parameters = new RequestConverters.Params(); | ||||||
parameters.withClusterManagerTimeout(getPipelineRequest.clusterManagerNodeTimeout()); | ||||||
request.addParameters(parameters.asMap()); | ||||||
return request; | ||||||
} | ||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.client; | ||
|
||
import org.opensearch.action.search.DeleteSearchPipelineRequest; | ||
import org.opensearch.action.search.GetSearchPipelineRequest; | ||
import org.opensearch.action.search.GetSearchPipelineResponse; | ||
import org.opensearch.action.search.PutSearchPipelineRequest; | ||
import org.opensearch.action.support.master.AcknowledgedResponse; | ||
import org.opensearch.common.bytes.BytesReference; | ||
import org.opensearch.common.xcontent.XContentType; | ||
import org.opensearch.core.xcontent.XContentBuilder; | ||
import org.opensearch.search.pipeline.Pipeline; | ||
|
||
import java.io.IOException; | ||
|
||
public class SearchPipelinesClientIT extends OpenSearchRestHighLevelClientTestCase { | ||
|
||
public void testPutPipeline() throws IOException { | ||
String id = "some_pipeline_id"; | ||
XContentBuilder pipelineBuilder = buildSearchPipeline(); | ||
PutSearchPipelineRequest request = new PutSearchPipelineRequest( | ||
id, | ||
BytesReference.bytes(pipelineBuilder), | ||
pipelineBuilder.contentType() | ||
); | ||
createPipeline(request); | ||
} | ||
|
||
private static void createPipeline(PutSearchPipelineRequest request) throws IOException { | ||
AcknowledgedResponse response = execute( | ||
request, | ||
highLevelClient().searchPipelines()::putPipeline, | ||
highLevelClient().searchPipelines()::putPipelineAsync | ||
); | ||
assertTrue(response.isAcknowledged()); | ||
} | ||
|
||
public void testGetPipeline() throws IOException { | ||
String id = "some_pipeline_id"; | ||
XContentBuilder pipelineBuilder = buildSearchPipeline(); | ||
PutSearchPipelineRequest request = new PutSearchPipelineRequest( | ||
id, | ||
BytesReference.bytes(pipelineBuilder), | ||
pipelineBuilder.contentType() | ||
); | ||
createPipeline(request); | ||
|
||
GetSearchPipelineRequest getRequest = new GetSearchPipelineRequest(id); | ||
GetSearchPipelineResponse response = execute( | ||
getRequest, | ||
highLevelClient().searchPipelines()::getPipeline, | ||
highLevelClient().searchPipelines()::getPipelineAsync | ||
); | ||
assertTrue(response.isFound()); | ||
assertEquals(1, response.pipelines().size()); | ||
assertEquals(id, response.pipelines().get(0).getId()); | ||
} | ||
|
||
public void testDeletePipeline() throws IOException { | ||
String id = "some_pipeline_id"; | ||
XContentBuilder pipelineBuilder = buildSearchPipeline(); | ||
PutSearchPipelineRequest request = new PutSearchPipelineRequest( | ||
id, | ||
BytesReference.bytes(pipelineBuilder), | ||
pipelineBuilder.contentType() | ||
); | ||
createPipeline(request); | ||
|
||
DeleteSearchPipelineRequest deleteRequest = new DeleteSearchPipelineRequest(id); | ||
AcknowledgedResponse response = execute( | ||
deleteRequest, | ||
highLevelClient().searchPipelines()::deletePipeline, | ||
highLevelClient().searchPipelines()::deletePipelineAsync | ||
); | ||
assertTrue(response.isAcknowledged()); | ||
} | ||
|
||
private static XContentBuilder buildSearchPipeline() throws IOException { | ||
XContentType xContentType = randomFrom(XContentType.values()); | ||
XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent()); | ||
return buildSearchPipeline(pipelineBuilder); | ||
} | ||
|
||
private static XContentBuilder buildSearchPipeline(XContentBuilder builder) throws IOException { | ||
builder.startObject(); | ||
{ | ||
builder.field("description", "a pipeline description"); | ||
builder.startArray(Pipeline.REQUEST_PROCESSORS_KEY); | ||
{ | ||
builder.startObject().startObject("filter_query"); | ||
{ | ||
builder.startObject("query"); | ||
{ | ||
builder.startObject("term"); | ||
{ | ||
builder.field("field", "value"); | ||
} | ||
builder.endObject(); | ||
} | ||
builder.endObject(); | ||
} | ||
builder.endObject().endObject(); | ||
} | ||
builder.endArray(); | ||
} | ||
builder.endObject(); | ||
return builder; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@msfroh I would suggest to settle on same naming conventions, there is in use sporadically plural
SearchPipelinesClient
and singularSearchPipelineService
forms, would be good to have it just concise everywhere.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. I looked through the list of changed files, and I still see the plural form used for this client and the associated YAML tests. I'll update those to use the singular form.
Right now,
SearchPipelinesInfo
doesn't have information about any search pipelines (singular or plural), but rather keeps track of the available search pipeline processors, so that we can validate on pipeline creation that all nodes have all processors mentioned in the pipeline. Given that it describes information at the node level, I don't think it would ever hold information about search pipelines themselves (whether a single pipeline or all pipelines), since the pipelines themselves are defined at the cluster level. So, after that roundabout thought process, I think I'll rename it fromSearchPipelinesInfo
toSearchPipelineInfo
, since it's ultimately carrying information about the node-levelSearchPipelineService
.