Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial search pipelines implementation #6587

Merged
merged 18 commits into from
Apr 10, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- The truncation limit of the OpenSearchJsonLayout logger is now configurable ([#6569](https://github.com/opensearch-project/OpenSearch/pull/6569))
- Add 'base_path' setting to File System Repository ([#6558](https://github.com/opensearch-project/OpenSearch/pull/6558))
- Return success on DeletePits when no PITs exist. ([#6544](https://github.com/opensearch-project/OpenSearch/pull/6544))
- Add initial search pipelines ([#6587](https://github.com/opensearch-project/OpenSearch/pull/6587))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https://github.com/opensearch-project/OpenSearch/pull/6490))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ public class RestHighLevelClient implements Closeable {
private final IngestClient ingestClient = new IngestClient(this);
private final SnapshotClient snapshotClient = new SnapshotClient(this);
private final TasksClient tasksClient = new TasksClient(this);
private final SearchPipelinesClient searchPipelinesClient = new SearchPipelinesClient(this);

/**
* Creates a {@link RestHighLevelClient} given the low level {@link RestClientBuilder} that allows to build the
Expand Down Expand Up @@ -354,6 +355,10 @@ public final TasksClient tasks() {
return tasksClient;
}

public final SearchPipelinesClient searchPipelines() {
return searchPipelinesClient;
}

/**
* Executes a bulk request using the Bulk API.
* @param bulkRequest the request
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.client;

import org.opensearch.action.ActionListener;
import org.opensearch.action.search.DeleteSearchPipelineRequest;
import org.opensearch.action.search.GetSearchPipelineRequest;
import org.opensearch.action.search.GetSearchPipelineResponse;
import org.opensearch.action.search.PutSearchPipelineRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;

import java.io.IOException;
import java.util.Collections;

import static java.util.Collections.emptySet;

public final class SearchPipelinesClient {
Copy link
Collaborator

@reta reta Mar 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@msfroh I would suggest to settle on same naming conventions, there is in use sporadically plural SearchPipelinesClient and singular SearchPipelineService forms, would be good to have it just concise everywhere.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. I looked through the list of changed files, and I still see the plural form used for this client and the associated YAML tests. I'll update those to use the singular form.

Right now, SearchPipelinesInfo doesn't have information about any search pipelines (singular or plural), but rather keeps track of the available search pipeline processors, so that we can validate on pipeline creation that all nodes have all processors mentioned in the pipeline. Given that it describes information at the node level, I don't think it would ever hold information about search pipelines themselves (whether a single pipeline or all pipelines), since the pipelines themselves are defined at the cluster level. So, after that roundabout thought process, I think I'll rename it from SearchPipelinesInfo to SearchPipelineInfo, since it's ultimately carrying information about the node-level SearchPipelineService.

private final RestHighLevelClient restHighLevelClient;

SearchPipelinesClient(RestHighLevelClient restHighLevelClient) {
this.restHighLevelClient = restHighLevelClient;
}

/**
* Add a pipeline or update an existing pipeline.
*
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public AcknowledgedResponse putPipeline(PutSearchPipelineRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(
request,
SearchPipelinesRequestConverters::putPipeline,
options,
AcknowledgedResponse::fromXContent,
emptySet()
);
}

/**
* Asynchronously add a pipeline or update an existing pipeline.
*
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return cancellable that may be used to cancel the request
*/
public Cancellable putPipelineAsync(
PutSearchPipelineRequest request,
RequestOptions options,
ActionListener<AcknowledgedResponse> listener
) {
return restHighLevelClient.performRequestAsyncAndParseEntity(
request,
SearchPipelinesRequestConverters::putPipeline,
options,
AcknowledgedResponse::fromXContent,
listener,
emptySet()
);
}

/**
* Get an existing pipeline.
msfroh marked this conversation as resolved.
Show resolved Hide resolved
*
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public GetSearchPipelineResponse getPipeline(GetSearchPipelineRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(
request,
SearchPipelinesRequestConverters::getPipeline,
options,
GetSearchPipelineResponse::fromXContent,
Collections.singleton(404)
);
}

/**
* Asynchronously get an existing pipeline.
*
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return cancellable that may be used to cancel the request
*/
public Cancellable getPipelineAsync(
GetSearchPipelineRequest request,
RequestOptions options,
ActionListener<GetSearchPipelineResponse> listener
) {
return restHighLevelClient.performRequestAsyncAndParseEntity(
request,
SearchPipelinesRequestConverters::getPipeline,
options,
GetSearchPipelineResponse::fromXContent,
listener,
Collections.singleton(404)
);
}

/**
* Delete an existing pipeline.
*
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public AcknowledgedResponse deletePipeline(DeleteSearchPipelineRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(
request,
SearchPipelinesRequestConverters::deletePipeline,
options,
AcknowledgedResponse::fromXContent,
emptySet()
);
}

/**
* Asynchronously delete an existing pipeline.
*
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return cancellable that may be used to cancel the request
*/
public Cancellable deletePipelineAsync(
DeleteSearchPipelineRequest request,
RequestOptions options,
ActionListener<AcknowledgedResponse> listener
) {
return restHighLevelClient.performRequestAsyncAndParseEntity(
request,
SearchPipelinesRequestConverters::deletePipeline,
options,
AcknowledgedResponse::fromXContent,
listener,
emptySet()
);
}

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have plan to support simulate similar as ingest pipeline?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since a search is inherently read-only, it doesn't need a dedicated simulate API per se. For ingest, you need to be able to ask "If I were to index this document with this pipeline, what would it look like?" without polluting your index. For search, you can just run queries through different pipelines and inspect the results.

What I would like to borrow from simulate, though, is the ability to define the pipeline in the request itself (as a lighter-weight option versus persisting a pipeline before trying it out). We have an open issue for that: #6717

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One potential use case for simulate: output the request rewritten by a script processor

Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.client;

import org.apache.hc.client5.http.classic.methods.HttpDelete;
import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.classic.methods.HttpPut;
import org.opensearch.action.search.DeleteSearchPipelineRequest;
import org.opensearch.action.search.GetSearchPipelineRequest;
import org.opensearch.action.search.PutSearchPipelineRequest;

import java.io.IOException;

final class SearchPipelinesRequestConverters {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final class SearchPipelinesRequestConverters {
final class SearchPipelineRequestConverters {

?

private SearchPipelinesRequestConverters() {}

static Request putPipeline(PutSearchPipelineRequest putPipelineRequest) throws IOException {
String endpoint = new RequestConverters.EndpointBuilder().addPathPartAsIs("_search/pipeline")
.addPathPart(putPipelineRequest.getId())
.build();
Request request = new Request(HttpPut.METHOD_NAME, endpoint);

RequestConverters.Params params = new RequestConverters.Params();
params.withTimeout(putPipelineRequest.timeout());
params.withClusterManagerTimeout(putPipelineRequest.clusterManagerNodeTimeout());
request.addParameters(params.asMap());
request.setEntity(RequestConverters.createEntity(putPipelineRequest, RequestConverters.REQUEST_BODY_CONTENT_TYPE));
return request;
}

static Request deletePipeline(DeleteSearchPipelineRequest deletePipelineRequest) {
String endpoint = new RequestConverters.EndpointBuilder().addPathPartAsIs("_search/pipeline")
.addPathPart(deletePipelineRequest.getId())
.build();
Request request = new Request(HttpDelete.METHOD_NAME, endpoint);

RequestConverters.Params parameters = new RequestConverters.Params();
parameters.withTimeout(deletePipelineRequest.timeout());
parameters.withClusterManagerTimeout(deletePipelineRequest.clusterManagerNodeTimeout());
request.addParameters(parameters.asMap());
return request;
}

static Request getPipeline(GetSearchPipelineRequest getPipelineRequest) {
String endpoint = new RequestConverters.EndpointBuilder().addPathPartAsIs("_search/pipeline")
.addCommaSeparatedPathParts(getPipelineRequest.getIds())
.build();
Request request = new Request(HttpGet.METHOD_NAME, endpoint);

RequestConverters.Params parameters = new RequestConverters.Params();
parameters.withClusterManagerTimeout(getPipelineRequest.clusterManagerNodeTimeout());
request.addParameters(parameters.asMap());
return request;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.client;

import org.opensearch.action.search.DeleteSearchPipelineRequest;
import org.opensearch.action.search.GetSearchPipelineRequest;
import org.opensearch.action.search.GetSearchPipelineResponse;
import org.opensearch.action.search.PutSearchPipelineRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.search.pipeline.Pipeline;

import java.io.IOException;

public class SearchPipelinesClientIT extends OpenSearchRestHighLevelClientTestCase {

public void testPutPipeline() throws IOException {
String id = "some_pipeline_id";
XContentBuilder pipelineBuilder = buildSearchPipeline();
PutSearchPipelineRequest request = new PutSearchPipelineRequest(
id,
BytesReference.bytes(pipelineBuilder),
pipelineBuilder.contentType()
);
createPipeline(request);
}

private static void createPipeline(PutSearchPipelineRequest request) throws IOException {
AcknowledgedResponse response = execute(
request,
highLevelClient().searchPipelines()::putPipeline,
highLevelClient().searchPipelines()::putPipelineAsync
);
assertTrue(response.isAcknowledged());
}

public void testGetPipeline() throws IOException {
String id = "some_pipeline_id";
XContentBuilder pipelineBuilder = buildSearchPipeline();
PutSearchPipelineRequest request = new PutSearchPipelineRequest(
id,
BytesReference.bytes(pipelineBuilder),
pipelineBuilder.contentType()
);
createPipeline(request);

GetSearchPipelineRequest getRequest = new GetSearchPipelineRequest(id);
GetSearchPipelineResponse response = execute(
getRequest,
highLevelClient().searchPipelines()::getPipeline,
highLevelClient().searchPipelines()::getPipelineAsync
);
assertTrue(response.isFound());
assertEquals(1, response.pipelines().size());
assertEquals(id, response.pipelines().get(0).getId());
}

public void testDeletePipeline() throws IOException {
String id = "some_pipeline_id";
XContentBuilder pipelineBuilder = buildSearchPipeline();
PutSearchPipelineRequest request = new PutSearchPipelineRequest(
id,
BytesReference.bytes(pipelineBuilder),
pipelineBuilder.contentType()
);
createPipeline(request);

DeleteSearchPipelineRequest deleteRequest = new DeleteSearchPipelineRequest(id);
AcknowledgedResponse response = execute(
deleteRequest,
highLevelClient().searchPipelines()::deletePipeline,
highLevelClient().searchPipelines()::deletePipelineAsync
);
assertTrue(response.isAcknowledged());
}

private static XContentBuilder buildSearchPipeline() throws IOException {
XContentType xContentType = randomFrom(XContentType.values());
XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent());
return buildSearchPipeline(pipelineBuilder);
}

private static XContentBuilder buildSearchPipeline(XContentBuilder builder) throws IOException {
builder.startObject();
{
builder.field("description", "a pipeline description");
builder.startArray(Pipeline.REQUEST_PROCESSORS_KEY);
{
builder.startObject().startObject("filter_query");
{
builder.startObject("query");
{
builder.startObject("term");
{
builder.field("field", "value");
}
builder.endObject();
}
builder.endObject();
}
builder.endObject().endObject();
}
builder.endArray();
}
builder.endObject();
return builder;
}
}
Loading