Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial search pipelines implementation #6587

Merged
merged 18 commits into from
Apr 10, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- The truncation limit of the OpenSearchJsonLayout logger is now configurable ([#6569](https://github.com/opensearch-project/OpenSearch/pull/6569))
- Add 'base_path' setting to File System Repository ([#6558](https://github.com/opensearch-project/OpenSearch/pull/6558))
- Return success on DeletePits when no PITs exist. ([#6544](https://github.com/opensearch-project/OpenSearch/pull/6544))
- Add initial search pipelines ([#6587](https://github.com/opensearch-project/OpenSearch/pull/6587))
- Add node repurpose command for search nodes ([#6517](https://github.com/opensearch-project/OpenSearch/pull/6517))
- Add wait_for_completion parameter to resize, open, and forcemerge APIs ([#6434](https://github.com/opensearch-project/OpenSearch/pull/6434))
- [Segment Replication] Apply backpressure when replicas fall behind ([#6563](https://github.com/opensearch-project/OpenSearch/pull/6563))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ public class RestHighLevelClient implements Closeable {
private final IngestClient ingestClient = new IngestClient(this);
private final SnapshotClient snapshotClient = new SnapshotClient(this);
private final TasksClient tasksClient = new TasksClient(this);
private final SearchPipelineClient searchPipelineClient = new SearchPipelineClient(this);

/**
* Creates a {@link RestHighLevelClient} given the low level {@link RestClientBuilder} that allows to build the
Expand Down Expand Up @@ -354,6 +355,10 @@ public final TasksClient tasks() {
return tasksClient;
}

public final SearchPipelineClient searchPipeline() {
return searchPipelineClient;
}

/**
* Executes a bulk request using the Bulk API.
* @param bulkRequest the request
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.client;

import org.opensearch.action.ActionListener;
import org.opensearch.action.search.DeleteSearchPipelineRequest;
import org.opensearch.action.search.GetSearchPipelineRequest;
import org.opensearch.action.search.GetSearchPipelineResponse;
import org.opensearch.action.search.PutSearchPipelineRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;

import java.io.IOException;
import java.util.Collections;

import static java.util.Collections.emptySet;

public final class SearchPipelineClient {
private final RestHighLevelClient restHighLevelClient;

SearchPipelineClient(RestHighLevelClient restHighLevelClient) {
this.restHighLevelClient = restHighLevelClient;
}

/**
* Add a pipeline or update an existing pipeline.
*
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public AcknowledgedResponse put(PutSearchPipelineRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(
request,
SearchPipelinesRequestConverters::putPipeline,
options,
AcknowledgedResponse::fromXContent,
emptySet()
);
}

/**
* Asynchronously add a pipeline or update an existing pipeline.
*
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return cancellable that may be used to cancel the request
*/
public Cancellable putAsync(PutSearchPipelineRequest request, RequestOptions options, ActionListener<AcknowledgedResponse> listener) {
return restHighLevelClient.performRequestAsyncAndParseEntity(
request,
SearchPipelinesRequestConverters::putPipeline,
options,
AcknowledgedResponse::fromXContent,
listener,
emptySet()
);
}

/**
* Get existing pipelines.
*
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public GetSearchPipelineResponse get(GetSearchPipelineRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(
request,
SearchPipelinesRequestConverters::getPipeline,
options,
GetSearchPipelineResponse::fromXContent,
Collections.singleton(404)
);
}

/**
* Asynchronously get existing pipelines.
*
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return cancellable that may be used to cancel the request
*/
public Cancellable getAsync(
GetSearchPipelineRequest request,
RequestOptions options,
ActionListener<GetSearchPipelineResponse> listener
) {
return restHighLevelClient.performRequestAsyncAndParseEntity(
request,
SearchPipelinesRequestConverters::getPipeline,
options,
GetSearchPipelineResponse::fromXContent,
listener,
Collections.singleton(404)
);
}

/**
* Delete an existing pipeline.
*
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public AcknowledgedResponse delete(DeleteSearchPipelineRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(
request,
SearchPipelinesRequestConverters::deletePipeline,
options,
AcknowledgedResponse::fromXContent,
emptySet()
);
}

/**
* Asynchronously delete an existing pipeline.
*
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return cancellable that may be used to cancel the request
*/
public Cancellable deleteAsync(
DeleteSearchPipelineRequest request,
RequestOptions options,
ActionListener<AcknowledgedResponse> listener
) {
return restHighLevelClient.performRequestAsyncAndParseEntity(
request,
SearchPipelinesRequestConverters::deletePipeline,
options,
AcknowledgedResponse::fromXContent,
listener,
emptySet()
);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.client;

import org.apache.hc.client5.http.classic.methods.HttpDelete;
import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.classic.methods.HttpPut;
import org.opensearch.action.search.DeleteSearchPipelineRequest;
import org.opensearch.action.search.GetSearchPipelineRequest;
import org.opensearch.action.search.PutSearchPipelineRequest;

import java.io.IOException;

final class SearchPipelinesRequestConverters {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final class SearchPipelinesRequestConverters {
final class SearchPipelineRequestConverters {

?

private SearchPipelinesRequestConverters() {}

static Request putPipeline(PutSearchPipelineRequest putPipelineRequest) throws IOException {
String endpoint = new RequestConverters.EndpointBuilder().addPathPartAsIs("_search/pipeline")
.addPathPart(putPipelineRequest.getId())
.build();
Request request = new Request(HttpPut.METHOD_NAME, endpoint);

RequestConverters.Params params = new RequestConverters.Params();
params.withTimeout(putPipelineRequest.timeout());
params.withClusterManagerTimeout(putPipelineRequest.clusterManagerNodeTimeout());
request.addParameters(params.asMap());
request.setEntity(RequestConverters.createEntity(putPipelineRequest, RequestConverters.REQUEST_BODY_CONTENT_TYPE));
return request;
}

static Request deletePipeline(DeleteSearchPipelineRequest deletePipelineRequest) {
String endpoint = new RequestConverters.EndpointBuilder().addPathPartAsIs("_search/pipeline")
.addPathPart(deletePipelineRequest.getId())
.build();
Request request = new Request(HttpDelete.METHOD_NAME, endpoint);

RequestConverters.Params parameters = new RequestConverters.Params();
parameters.withTimeout(deletePipelineRequest.timeout());
parameters.withClusterManagerTimeout(deletePipelineRequest.clusterManagerNodeTimeout());
request.addParameters(parameters.asMap());
return request;
}

static Request getPipeline(GetSearchPipelineRequest getPipelineRequest) {
String endpoint = new RequestConverters.EndpointBuilder().addPathPartAsIs("_search/pipeline")
.addCommaSeparatedPathParts(getPipelineRequest.getIds())
.build();
Request request = new Request(HttpGet.METHOD_NAME, endpoint);

RequestConverters.Params parameters = new RequestConverters.Params();
parameters.withClusterManagerTimeout(getPipelineRequest.clusterManagerNodeTimeout());
request.addParameters(parameters.asMap());
return request;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.client;

import org.opensearch.action.search.DeleteSearchPipelineRequest;
import org.opensearch.action.search.GetSearchPipelineRequest;
import org.opensearch.action.search.GetSearchPipelineResponse;
import org.opensearch.action.search.PutSearchPipelineRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;

public class SearchPipelineClientIT extends OpenSearchRestHighLevelClientTestCase {

public void testPutPipeline() throws IOException {
String id = "some_pipeline_id";
XContentBuilder pipelineBuilder = buildSearchPipeline();
PutSearchPipelineRequest request = new PutSearchPipelineRequest(
id,
BytesReference.bytes(pipelineBuilder),
pipelineBuilder.contentType()
);
createPipeline(request);
}

private static void createPipeline(PutSearchPipelineRequest request) throws IOException {
AcknowledgedResponse response = execute(
request,
highLevelClient().searchPipeline()::put,
highLevelClient().searchPipeline()::putAsync
);
assertTrue(response.isAcknowledged());
}

public void testGetPipeline() throws IOException {
String id = "some_pipeline_id";
XContentBuilder pipelineBuilder = buildSearchPipeline();
PutSearchPipelineRequest request = new PutSearchPipelineRequest(
id,
BytesReference.bytes(pipelineBuilder),
pipelineBuilder.contentType()
);
createPipeline(request);

GetSearchPipelineRequest getRequest = new GetSearchPipelineRequest(id);
GetSearchPipelineResponse response = execute(
getRequest,
highLevelClient().searchPipeline()::get,
highLevelClient().searchPipeline()::getAsync
);
assertTrue(response.isFound());
assertEquals(1, response.pipelines().size());
assertEquals(id, response.pipelines().get(0).getId());
}

public void testDeletePipeline() throws IOException {
String id = "some_pipeline_id";
XContentBuilder pipelineBuilder = buildSearchPipeline();
PutSearchPipelineRequest request = new PutSearchPipelineRequest(
id,
BytesReference.bytes(pipelineBuilder),
pipelineBuilder.contentType()
);
createPipeline(request);

DeleteSearchPipelineRequest deleteRequest = new DeleteSearchPipelineRequest(id);
AcknowledgedResponse response = execute(
deleteRequest,
highLevelClient().searchPipeline()::delete,
highLevelClient().searchPipeline()::deleteAsync
);
assertTrue(response.isAcknowledged());
}

private static XContentBuilder buildSearchPipeline() throws IOException {
XContentType xContentType = randomFrom(XContentType.values());
XContentBuilder pipelineBuilder = XContentBuilder.builder(xContentType.xContent());
return buildSearchPipeline(pipelineBuilder);
}

private static XContentBuilder buildSearchPipeline(XContentBuilder builder) throws IOException {
builder.startObject();
{
builder.field("description", "a pipeline description");
builder.startArray("request_processors");
{
builder.startObject().startObject("filter_query");
{
builder.startObject("query");
{
builder.startObject("term");
{
builder.field("field", "value");
}
builder.endObject();
}
builder.endObject();
}
builder.endObject().endObject();
}
builder.endArray();
}
builder.endObject();
return builder;
}
}
43 changes: 43 additions & 0 deletions modules/search-pipeline-common/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

apply plugin: 'opensearch.yaml-rest-test'
apply plugin: 'opensearch.internal-cluster-test'

opensearchplugin {
description 'Module for search pipeline processors that do not require additional security permissions or have large dependencies and resources'
classname 'org.opensearch.search.pipeline.common.SearchPipelineCommonModulePlugin'
}

dependencies {
}

restResources {
restApi {
includeCore '_common', 'search_pipeline', 'cluster', 'indices', 'index', 'nodes', 'search'
}
}

testClusters.all {
}

thirdPartyAudit.ignoreMissingClasses(
// from log4j
'org.osgi.framework.AdaptPermission',
'org.osgi.framework.AdminPermission',
'org.osgi.framework.Bundle',
'org.osgi.framework.BundleActivator',
'org.osgi.framework.BundleContext',
'org.osgi.framework.BundleEvent',
'org.osgi.framework.SynchronousBundleListener',
'org.osgi.framework.wiring.BundleWire',
'org.osgi.framework.wiring.BundleWiring'
)
Loading