Skip to content

Commit

Permalink
[Transform] Encapsulate plugin-level settings into a `TransformExtens…
Browse files Browse the repository at this point in the history
…ion` interface (elastic#98978)
  • Loading branch information
przemekwitek authored Aug 30, 2023
1 parent d3f799c commit f9e4fa9
Show file tree
Hide file tree
Showing 13 changed files with 182 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.transform;

import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;

public class DefaultTransformExtension implements TransformExtension {

@Override
public boolean includeNodeInfo() {
return true;
}

@Override
public Settings getTransformInternalIndexAdditionalSettings() {
return Settings.EMPTY;
}

/**
* Provides destination index settings, hardcoded at the moment. In future this might be customizable or generation could be based on
* source settings.
*/
@Override
public Settings getTransformDestinationIndexSettings() {
return Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-1")
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa

private final Settings settings;
private final SetOnce<TransformServices> transformServices = new SetOnce<>();
private final TransformExtension transformExtension = new DefaultTransformExtension();

public static final Integer DEFAULT_INITIAL_MAX_PAGE_SEARCH_SIZE = Integer.valueOf(500);
public static final TimeValue DEFAULT_TRANSFORM_FREQUENCY = TimeValue.timeValueSeconds(60);
Expand Down Expand Up @@ -250,7 +251,12 @@ public Collection<Object> createComponents(
client,
xContentRegistry
);
TransformAuditor auditor = new TransformAuditor(client, clusterService.getNodeName(), clusterService, includeNodeInfo());
TransformAuditor auditor = new TransformAuditor(
client,
clusterService.getNodeName(),
clusterService,
getTransformExtension().includeNodeInfo()
);
Clock clock = Clock.systemUTC();
TransformCheckpointService checkpointService = new TransformCheckpointService(
clock,
Expand All @@ -264,7 +270,11 @@ public Collection<Object> createComponents(

transformServices.set(new TransformServices(configManager, checkpointService, auditor, scheduler));

return Arrays.asList(transformServices.get(), new TransformClusterStateListener(clusterService, client));
return Arrays.asList(
transformServices.get(),
new TransformClusterStateListener(clusterService, client),
new TransformExtensionHolder(getTransformExtension())
);
}

@Override
Expand All @@ -285,7 +295,7 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(
threadPool,
clusterService,
settingsModule.getSettings(),
getTransformInternalIndexAdditionalSettings(),
getTransformExtension().getTransformInternalIndexAdditionalSettings(),
expressionResolver
)
);
Expand Down Expand Up @@ -354,7 +364,9 @@ public UnaryOperator<Map<String, IndexTemplateMetadata>> getIndexTemplateMetadat
@Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
try {
return List.of(TransformInternalIndex.getSystemIndexDescriptor(getTransformInternalIndexAdditionalSettings()));
return List.of(
TransformInternalIndex.getSystemIndexDescriptor(getTransformExtension().getTransformInternalIndexAdditionalSettings())
);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Expand Down Expand Up @@ -467,11 +479,17 @@ public String getFeatureDescription() {
return "Manages configuration and state for transforms";
}

public TransformExtension getTransformExtension() {
return transformExtension;
}

@Deprecated
public boolean includeNodeInfo() {
return true;
return getTransformExtension().includeNodeInfo();
}

@Deprecated
public Settings getTransformInternalIndexAdditionalSettings() {
return Settings.EMPTY;
return getTransformExtension().getTransformInternalIndexAdditionalSettings();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.transform;

import org.elasticsearch.common.settings.Settings;

public interface TransformExtension {

boolean includeNodeInfo();

Settings getTransformInternalIndexAdditionalSettings();

/**
* Provides destination index settings, hardcoded at the moment. In future this might be customizable or generation could be based on
* source settings.
*/
Settings getTransformDestinationIndexSettings();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.transform;

import org.elasticsearch.node.Node;

import java.util.Objects;

/**
* Wrapper for the {@link TransformExtension} interface that allows it to be used
* given the way {@link Node} does Guice bindings for plugin components.
* TODO: remove this class entirely once Guice is removed entirely.
*/
public class TransformExtensionHolder {

private final TransformExtension transformExtension;

/**
* Used by Guice.
*/
public TransformExtensionHolder() {
this.transformExtension = null;
}

public TransformExtensionHolder(TransformExtension transformExtension) {
this.transformExtension = Objects.requireNonNull(transformExtension);
}

public boolean isEmpty() {
return transformExtension == null;
}

public TransformExtension getTransformExtension() {
return transformExtension;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public static void updateTransform(
final boolean dryRun,
final boolean checkAccess,
final TimeValue timeout,
final Settings destIndexSettings,
ActionListener<UpdateResult> listener
) {
// rewrite config into a new format if necessary
Expand Down Expand Up @@ -185,6 +186,7 @@ public static void updateTransform(
destIndexMappings,
seqNoPrimaryTermAndIndex,
clusterState,
destIndexSettings,
ActionListener.wrap(r -> updateTransformListener.onResponse(null), listener::onFailure)
);
}, listener::onFailure);
Expand Down Expand Up @@ -300,6 +302,7 @@ private static void updateTransformConfiguration(
Map<String, String> mappings,
SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex,
ClusterState clusterState,
Settings destIndexSettings,
ActionListener<Void> listener
) {
// <3> Return to the listener
Expand Down Expand Up @@ -351,6 +354,7 @@ private static void updateTransformConfiguration(
indexNameExpressionResolver,
clusterState,
config,
destIndexSettings,
mappings,
createDestinationListener
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.elasticsearch.xpack.core.transform.transforms.SyncConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformDestIndexSettings;
import org.elasticsearch.xpack.transform.TransformExtensionHolder;
import org.elasticsearch.xpack.transform.persistence.TransformIndex;
import org.elasticsearch.xpack.transform.transforms.Function;
import org.elasticsearch.xpack.transform.transforms.FunctionFactory;
Expand Down Expand Up @@ -78,6 +79,7 @@ public class TransportPreviewTransformAction extends HandledTransportAction<Requ
private final TransportService transportService;
private final Settings nodeSettings;
private final SourceDestValidator sourceDestValidator;
private final Settings destIndexSettings;

@Inject
public TransportPreviewTransformAction(
Expand All @@ -88,7 +90,8 @@ public TransportPreviewTransformAction(
IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService,
Settings settings,
IngestService ingestService
IngestService ingestService,
TransformExtensionHolder transformExtensionHolder
) {
super(PreviewTransformAction.NAME, transportService, actionFilters, Request::new);
this.securityContext = XPackSettings.SECURITY_ENABLED.get(settings)
Expand All @@ -111,6 +114,7 @@ public TransportPreviewTransformAction(
clusterService.getNodeName(),
License.OperationMode.BASIC.description()
);
this.destIndexSettings = transformExtensionHolder.getTransformExtension().getTransformDestinationIndexSettings();
}

@Override
Expand Down Expand Up @@ -236,6 +240,7 @@ private void getPreview(
HeaderWarning.addWarning("Pipeline returned " + errors.size() + " errors, first error: " + errors.get(0));
}
TransformDestIndexSettings generatedDestIndexSettings = TransformIndex.createTransformDestIndexSettings(
destIndexSettings,
mappings.get(),
transformId,
Clock.systemUTC()
Expand All @@ -249,6 +254,7 @@ private void getPreview(
ActionListener<List<Map<String, Object>>> previewListener = ActionListener.wrap(docs -> {
if (pipeline == null) {
TransformDestIndexSettings generatedDestIndexSettings = TransformIndex.createTransformDestIndexSettings(
destIndexSettings,
mappings.get(),
transformId,
Clock.systemUTC()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.xpack.core.transform.action.StopTransformAction;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfigUpdate;
import org.elasticsearch.xpack.transform.TransformExtensionHolder;
import org.elasticsearch.xpack.transform.TransformServices;
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
Expand All @@ -58,6 +59,7 @@ public class TransportResetTransformAction extends AcknowledgedTransportMasterNo
private final Client client;
private final SecurityContext securityContext;
private final Settings settings;
private final Settings destIndexSettings;

@Inject
public TransportResetTransformAction(
Expand All @@ -68,7 +70,8 @@ public TransportResetTransformAction(
IndexNameExpressionResolver indexNameExpressionResolver,
TransformServices transformServices,
Client client,
Settings settings
Settings settings,
TransformExtensionHolder transformExtensionHolder
) {
super(
ResetTransformAction.NAME,
Expand All @@ -87,6 +90,7 @@ public TransportResetTransformAction(
? new SecurityContext(settings, threadPool.getThreadContext())
: null;
this.settings = settings;
this.destIndexSettings = transformExtensionHolder.getTransformExtension().getTransformDestinationIndexSettings();
}

@Override
Expand Down Expand Up @@ -131,6 +135,7 @@ protected void masterOperation(Task task, Request request, ClusterState state, A
false, // dry run
false, // check access
request.timeout(),
destIndexSettings,
updateTransformListener
);
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.health.HealthStatus;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
Expand All @@ -40,6 +41,7 @@
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
import org.elasticsearch.xpack.transform.TransformExtensionHolder;
import org.elasticsearch.xpack.transform.TransformServices;
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.AuthorizationStatePersistenceUtils;
Expand All @@ -62,6 +64,7 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
private final PersistentTasksService persistentTasksService;
private final Client client;
private final TransformAuditor auditor;
private final Settings destIndexSettings;

@Inject
public TransportStartTransformAction(
Expand All @@ -72,7 +75,8 @@ public TransportStartTransformAction(
IndexNameExpressionResolver indexNameExpressionResolver,
TransformServices transformServices,
PersistentTasksService persistentTasksService,
Client client
Client client,
TransformExtensionHolder transformExtensionHolder
) {
this(
StartTransformAction.NAME,
Expand All @@ -83,7 +87,8 @@ public TransportStartTransformAction(
indexNameExpressionResolver,
transformServices,
persistentTasksService,
client
client,
transformExtensionHolder
);
}

Expand All @@ -96,7 +101,8 @@ protected TransportStartTransformAction(
IndexNameExpressionResolver indexNameExpressionResolver,
TransformServices transformServices,
PersistentTasksService persistentTasksService,
Client client
Client client,
TransformExtensionHolder transformExtensionHolder
) {
super(
name,
Expand All @@ -113,6 +119,7 @@ protected TransportStartTransformAction(
this.persistentTasksService = persistentTasksService;
this.client = client;
this.auditor = transformServices.getAuditor();
this.destIndexSettings = transformExtensionHolder.getTransformExtension().getTransformDestinationIndexSettings();
}

@Override
Expand Down Expand Up @@ -191,6 +198,7 @@ protected void masterOperation(
indexNameExpressionResolver,
state,
transformConfigHolder.get(),
destIndexSettings,
validationResponse.getDestIndexMappings(),
createOrGetIndexListener
);
Expand Down
Loading

0 comments on commit f9e4fa9

Please sign in to comment.