From cf22a8d7f7875e1ac30e176a67cd7a754bc2728d Mon Sep 17 00:00:00 2001 From: Peter Nied Date: Thu, 1 Feb 2024 20:39:38 +0000 Subject: [PATCH] Basic pattern for decoupled views in metadata vs transport requests Signed-off-by: Peter Nied --- .../org/opensearch/action/ActionModule.java | 4 + .../admin/indices/view/CreateViewAction.java | 193 ++++++++++++++++++ .../admin/indices/view/package-info.java | 10 + .../org/opensearch/cluster/metadata/View.java | 11 +- .../cluster/metadata/ViewService.java | 54 +++++ .../main/java/org/opensearch/node/Node.java | 9 +- .../cluster/metadata/ViewTests.java | 3 - 7 files changed, 273 insertions(+), 11 deletions(-) create mode 100644 server/src/main/java/org/opensearch/action/admin/indices/view/CreateViewAction.java create mode 100644 server/src/main/java/org/opensearch/action/admin/indices/view/package-info.java create mode 100644 server/src/main/java/org/opensearch/cluster/metadata/ViewService.java diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 46775466aa615..e2a738ac959a3 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -224,6 +224,7 @@ import org.opensearch.action.admin.indices.upgrade.post.UpgradeSettingsAction; import org.opensearch.action.admin.indices.validate.query.TransportValidateQueryAction; import org.opensearch.action.admin.indices.validate.query.ValidateQueryAction; +import org.opensearch.action.admin.indices.view.CreateViewAction; import org.opensearch.action.bulk.BulkAction; import org.opensearch.action.bulk.TransportBulkAction; import org.opensearch.action.bulk.TransportShardBulkAction; @@ -721,6 +722,9 @@ public void reg actions.register(ResolveIndexAction.INSTANCE, ResolveIndexAction.TransportAction.class); actions.register(DataStreamsStatsAction.INSTANCE, DataStreamsStatsAction.TransportAction.class); + // Views: + actions.register(CreateViewAction.INSTANCE, CreateViewAction.TransportAction.class); + // Persistent tasks: actions.register(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class); actions.register(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class); diff --git a/server/src/main/java/org/opensearch/action/admin/indices/view/CreateViewAction.java b/server/src/main/java/org/opensearch/action/admin/indices/view/CreateViewAction.java new file mode 100644 index 0000000000000..10d84bd832229 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/view/CreateViewAction.java @@ -0,0 +1,193 @@ +package org.opensearch.action.admin.indices.view; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.ActionType; +import org.opensearch.action.ValidateActions; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest; +import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.block.ClusterBlockException; +import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.metadata.ViewService; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.action.ActionResponse; +import org.opensearch.core.common.Strings; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +/** Action to create a view */ +public class CreateViewAction extends ActionType { + + public static final CreateViewAction INSTANCE = new CreateViewAction(); + public static final String NAME = "cluster:views:create"; + + private CreateViewAction() { + super(NAME, CreateViewAction.Response::new); + } + + + /** View target representation for create requests */ + public static class ViewTarget implements Writeable { + public final String indexPattern; + + public ViewTarget(final String indexPattern) { + this.indexPattern = indexPattern; + } + + public ViewTarget(final StreamInput in) throws IOException { + this.indexPattern = in.readString(); + } + + public String getIndexPattern() { + return indexPattern; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(indexPattern); + } + + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + + if (Strings.isNullOrEmpty(indexPattern)) { + validationException = ValidateActions.addValidationError("index pattern cannot be empty or null", validationException); + } + + return validationException; + } + + } + + /** + * Request for Creating View + */ + public static class Request extends ClusterManagerNodeRequest { + private final String name; + private final String description; + private final List targets; + + public Request(final String name, final String description, final List targets) { + this.name = name; + this.description = description; + this.targets = targets; + } + + public String getName() { + return name; + } + + public String getDescription() { + return description; + } + + public List getTargets() { + return new ArrayList<>(targets); + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (Strings.isNullOrEmpty(name)) { + validationException = ValidateActions.addValidationError("Name is cannot be empty or null", validationException); + } + if (targets.isEmpty()) { + validationException = ValidateActions.addValidationError("targets cannot be empty", validationException); + } + + for (final ViewTarget target : targets) { + validationException = target.validate(); + } + + return validationException; + } + + public Request(final StreamInput in) throws IOException { + super(in); + this.name = in.readString(); + this.description = in.readString(); + this.targets = in.readList(ViewTarget::new); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(name); + out.writeString(description); + out.writeList(targets); + } + } + + /** Response after view is created */ + public static class Response extends ActionResponse { + + private final org.opensearch.cluster.metadata.View createdView; + + public Response(final org.opensearch.cluster.metadata.View createdView) { + this.createdView = createdView; + } + + public Response(final StreamInput in) throws IOException { + super(in); + this.createdView = new org.opensearch.cluster.metadata.View(in); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + this.createdView.writeTo(out); + } + } + + /** + * Transport Action for creating a View + */ + public static class TransportAction extends TransportClusterManagerNodeAction { + + private final ViewService viewService; + + @Inject + public TransportAction( + final TransportService transportService, + final ClusterService clusterService, + final ThreadPool threadPool, + final ActionFilters actionFilters, + final IndexNameExpressionResolver indexNameExpressionResolver, + final ViewService viewService + ) { + super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver); + this.viewService = viewService; + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected Response read(StreamInput in) throws IOException { + return new Response(in); + } + + @Override + protected void clusterManagerOperation(Request request, ClusterState state, ActionListener listener) + throws Exception { + viewService.createView(request, listener); + } + + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/view/package-info.java b/server/src/main/java/org/opensearch/action/admin/indices/view/package-info.java new file mode 100644 index 0000000000000..db0556b1bf334 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/view/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** View transport handlers. */ +package org.opensearch.action.admin.indices.view; diff --git a/server/src/main/java/org/opensearch/cluster/metadata/View.java b/server/src/main/java/org/opensearch/cluster/metadata/View.java index 26678b9eb123c..8db65c6afaebe 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/View.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/View.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.List; +import java.util.Objects; /** TODO */ @ExperimentalApi @@ -34,11 +35,11 @@ public class View extends AbstractDiffable implements ToXContentObject { public final List targets; public View(final String name, final String description, final Long createdAt, final Long modifiedAt, final List targets) { - this.name = name; + this.name = Objects.requireNonNull(name, "Name must be provided"); this.description = description; this.createdAt = createdAt != null ? createdAt : -1; this.modifiedAt = modifiedAt != null ? modifiedAt : -1; - this.targets = targets; + this.targets = Objects.requireNonNull(targets, "Targets are required on a view"); } public View(final StreamInput in) throws IOException { @@ -55,11 +56,11 @@ public static class Target implements Writeable, ToXContentObject { public final String indexPattern; - Target(final String indexPattern) { - this.indexPattern = indexPattern; + public Target(final String indexPattern) { + this.indexPattern = Objects.requireNonNull(indexPattern, "IndexPattern is required"); } - private Target(final StreamInput in) throws IOException { + public Target(final StreamInput in) throws IOException { this(in.readString()); } diff --git a/server/src/main/java/org/opensearch/cluster/metadata/ViewService.java b/server/src/main/java/org/opensearch/cluster/metadata/ViewService.java new file mode 100644 index 0000000000000..4056e477f2226 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/metadata/ViewService.java @@ -0,0 +1,54 @@ +package org.opensearch.cluster.metadata; + +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.indices.view.CreateViewAction; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.ClusterStateUpdateTask; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.core.action.ActionListener; + +/** Service to interact with views, create, retrieve, update, and delete */ +public class ViewService { + + private final static Logger LOG = LogManager.getLogger(ViewService.class); + private final ClusterService clusterService; + + public ViewService(final ClusterService clusterService) { + this.clusterService = clusterService; + } + + public void createView(final CreateViewAction.Request request, final ActionListener listener) { + final long currentTime = System.currentTimeMillis(); + + final List targets = request.getTargets() + .stream() + .map(target -> new View.Target(target.getIndexPattern())) + .collect(Collectors.toList()); + final View view = new View(request.getName(), request.getDescription(), currentTime, currentTime, targets); + + clusterService.submitStateUpdateTask("create_view_task", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(final ClusterState currentState) throws Exception { + return new ClusterState.Builder(clusterService.state()).metadata(Metadata.builder(currentState.metadata()).put(view)) + .build(); + } + + @Override + public void onFailure(final String source, final Exception e) { + LOG.error("Unable to create view, due to {}", source, e); + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) { + final View createdView = newState.getMetadata().views().get(request.getName()); + final CreateViewAction.Response response = new CreateViewAction.Response(createdView); + listener.onResponse(response); + } + }); + } +} diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index d9c139be9f915..d26a7deae9e77 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -72,6 +72,7 @@ import org.opensearch.cluster.metadata.MetadataIndexUpgradeService; import org.opensearch.cluster.metadata.SystemIndexMetadataUpgradeService; import org.opensearch.cluster.metadata.TemplateUpgradeService; +import org.opensearch.cluster.metadata.ViewService; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.routing.BatchedRerouteService; @@ -862,6 +863,10 @@ protected Node( metadataCreateIndexService ); + final ViewService viewService = new ViewService( + clusterService + ); + Collection pluginComponents = pluginsService.filterPlugins(Plugin.class) .stream() .flatMap( @@ -898,9 +903,6 @@ protected Node( ); modules.add(actionModule); - actionModule.getRestController().registerHandler(new RestViewAction(clusterService)); - actionModule.getRestController().registerHandler(new RestViewSearchAction(clusterService)); - final RestController restController = actionModule.getRestController(); final NodeResourceUsageTracker nodeResourceUsageTracker = new NodeResourceUsageTracker( @@ -1222,6 +1224,7 @@ protected Node( b.bind(MetadataCreateIndexService.class).toInstance(metadataCreateIndexService); b.bind(AwarenessReplicaBalance.class).toInstance(awarenessReplicaBalance); b.bind(MetadataCreateDataStreamService.class).toInstance(metadataCreateDataStreamService); + b.bind(ViewService.class).toInstance(viewService); b.bind(SearchService.class).toInstance(searchService); b.bind(SearchTransportService.class).toInstance(searchTransportService); b.bind(SearchPhaseController.class) diff --git a/server/src/test/java/org/opensearch/cluster/metadata/ViewTests.java b/server/src/test/java/org/opensearch/cluster/metadata/ViewTests.java index 4eab59d32ca0c..c1184ddeca915 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/ViewTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/ViewTests.java @@ -82,7 +82,6 @@ public void testNullTargetIndexPattern() { assertThat(npe.getMessage(), equalTo("IndexPattern is required")); } - public void testDefaultValues() { final View view = new View("myName", null, null, null, List.of()); @@ -92,6 +91,4 @@ public void testDefaultValues() { assertThat(view.modifiedAt, equalTo(-1L)); assertThat(view.targets, empty()); } - - }