Skip to content

Commit

Permalink
[1.3] Conditionally serialize with ODFE package if min version of nod…
Browse files Browse the repository at this point in the history
…e in cluster is <OS 1.0.0 (#2268)

* Conditionally serialize based on minNodeVersion

Signed-off-by: Craig Perkins <cwperx@amazon.com>
Co-authored-by: Peter Nied <petern@amazon.com>
  • Loading branch information
cwperks and peternied committed Nov 28, 2022
1 parent 983385f commit 00e40e4
Show file tree
Hide file tree
Showing 11 changed files with 170 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ public final class OpenSearchSecurityPlugin extends OpenSearchSecuritySSLPlugin
private volatile DlsFlsRequestValve dlsFlsValve = null;
private volatile Salt salt;
private volatile OpensearchDynamicSetting<Boolean> transportPassiveAuthSetting;
private final ClusterInfoHolder cih = new ClusterInfoHolder();

public static boolean isActionTraceEnabled() {
return actionTrace.isTraceEnabled();
Expand Down Expand Up @@ -453,7 +454,7 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
handlers.addAll(super.getRestHandlers(settings, restController, clusterSettings, indexScopedSettings, settingsFilter, indexNameExpressionResolver, nodesInCluster));

if(!SSLConfig.isSslOnlyMode()) {
handlers.add(new SecurityInfoAction(settings, restController, Objects.requireNonNull(evaluator), Objects.requireNonNull(threadPool)));
handlers.add(new SecurityInfoAction(settings, restController, Objects.requireNonNull(evaluator), Objects.requireNonNull(threadPool), this.cih));
handlers.add(new SecurityHealthAction(settings, restController, Objects.requireNonNull(backendRegistry)));
handlers.add(new SecuritySSLCertsInfoAction(settings, restController, sks, Objects.requireNonNull(threadPool), Objects.requireNonNull(adminDns)));
handlers.add(new DashboardsInfoAction(settings, restController, Objects.requireNonNull(evaluator), Objects.requireNonNull(threadPool)));
Expand Down Expand Up @@ -752,12 +753,11 @@ public Collection<Object> createComponents(Client localClient, ClusterService cl
//Register opensearch dynamic settings
transportPassiveAuthSetting.registerClusterSettingsChangeListener(clusterService.getClusterSettings());

final ClusterInfoHolder cih = new ClusterInfoHolder();
this.cs.addListener(cih);
this.cs.addListener(this.cih);
this.salt = Salt.from(settings);

final IndexNameExpressionResolver resolver = new IndexNameExpressionResolver(threadPool.getThreadContext());
irr = new IndexResolverReplacer(resolver, clusterService, cih);
irr = new IndexResolverReplacer(resolver, clusterService, this.cih);

final String DEFAULT_INTERCLUSTER_REQUEST_EVALUATOR_CLASS = DefaultInterClusterRequestEvaluator.class.getName();
InterClusterRequestEvaluator interClusterRequestEvaluator = new DefaultInterClusterRequestEvaluator(settings);
Expand Down Expand Up @@ -795,9 +795,9 @@ public Collection<Object> createComponents(Client localClient, ClusterService cl
// DLS-FLS is enabled if not client and not disabled and not SSL only.
final boolean dlsFlsEnabled = !SSLConfig.isSslOnlyMode();
evaluator = new PrivilegesEvaluator(clusterService, threadPool, cr, resolver, auditLog,
settings, privilegesInterceptor, cih, irr, dlsFlsEnabled);
settings, privilegesInterceptor, this.cih, irr, dlsFlsEnabled);

sf = new SecurityFilter(localClient, settings, evaluator, adminDns, dlsFlsValve, auditLog, threadPool, cs, compatConfig, irr, backendRegistry);
sf = new SecurityFilter(localClient, settings, evaluator, adminDns, dlsFlsValve, auditLog, threadPool, cs, compatConfig, irr, backendRegistry, this.cih);

final String principalExtractorClass = settings.get(SSLConfigConstants.SECURITY_SSL_TRANSPORT_PRINCIPAL_EXTRACTOR_CLASS, null);

Expand All @@ -810,7 +810,7 @@ public Collection<Object> createComponents(Client localClient, ClusterService cl
securityRestHandler = new SecurityRestFilter(backendRegistry, auditLog, threadPool,
principalExtractor, settings, configPath, compatConfig);

final DynamicConfigFactory dcf = new DynamicConfigFactory(cr, settings, configPath, localClient, threadPool, cih);
final DynamicConfigFactory dcf = new DynamicConfigFactory(cr, settings, configPath, localClient, threadPool, this.cih);
dcf.registerDCFListener(backendRegistry);
dcf.registerDCFListener(compatConfig);
dcf.registerDCFListener(irr);
Expand All @@ -825,7 +825,7 @@ public Collection<Object> createComponents(Client localClient, ClusterService cl
cr.setDynamicConfigFactory(dcf);

si = new SecurityInterceptor(settings, threadPool, backendRegistry, auditLog, principalExtractor,
interClusterRequestEvaluator, cs, Objects.requireNonNull(sslExceptionHandler), Objects.requireNonNull(cih), SSLConfig);
interClusterRequestEvaluator, cs, Objects.requireNonNull(sslExceptionHandler), Objects.requireNonNull(this.cih), SSLConfig);
components.add(principalExtractor);

// NOTE: We need to create DefaultInterClusterRequestEvaluator before creating ConfigurationRepository since the latter requires security index to be accessible which means
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateListener;
Expand All @@ -49,13 +50,22 @@ public class ClusterInfoHolder implements ClusterStateListener {
protected final Logger log = LogManager.getLogger(this.getClass());
private volatile Boolean has6xNodes = null;
private volatile Boolean has6xIndices = null;

private volatile Boolean hasOdfeNodes = null;
private volatile DiscoveryNodes nodes = null;
private volatile Boolean isLocalNodeElectedMaster = null;
private volatile boolean initialized;

@Override
public void clusterChanged(ClusterChangedEvent event) {
final boolean isTraceEnabled = log.isTraceEnabled();
if(hasOdfeNodes == null || event.nodesChanged()) {
hasOdfeNodes = Boolean.valueOf(clusterHasOdfeNodes(event.state()));
if (isTraceEnabled) {
log.trace("hasOdfeNodes: {}", hasOdfeNodes);
}
}

if(has6xNodes == null || event.nodesChanged()) {
has6xNodes = Boolean.valueOf(clusterHas6xNodes(event.state()));
if (isTraceEnabled) {
Expand Down Expand Up @@ -91,6 +101,10 @@ public Boolean getHas6xIndices() {
return has6xIndices;
}

public Boolean getHasOdfeNodes() {
return hasOdfeNodes;
}

public Boolean isLocalNodeElectedMaster() {
return isLocalNodeElectedMaster;
}
Expand Down Expand Up @@ -124,4 +138,8 @@ private static boolean clusterHas6xIndices(ClusterState state) {
}
return false;
}

private static boolean clusterHasOdfeNodes(ClusterState state) {
return state.nodes().getMinNodeVersion().before(Version.V_1_0_0);
}
}
17 changes: 14 additions & 3 deletions src/main/java/org/opensearch/security/filter/SecurityFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,15 @@
package org.opensearch.security.filter;

import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

import org.apache.kafka.common.Cluster;
import org.opensearch.Version;
import org.opensearch.security.auth.RolesInjector;
import org.opensearch.security.configuration.ClusterInfoHolder;
import org.opensearch.security.resolver.IndexResolverReplacer;
import org.opensearch.security.support.WildcardMatcher;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -94,6 +98,7 @@
import org.opensearch.threadpool.ThreadPool;

import org.opensearch.security.support.Base64Helper;
import org.opensearch.security.support.Base64Helper.PackageBehavior;
import org.opensearch.security.support.ConfigConstants;
import org.opensearch.security.support.HeaderHelper;
import org.opensearch.security.support.SourceFieldsContext;
Expand All @@ -118,9 +123,11 @@ public class SecurityFilter implements ActionFilter {
private final Client client;
private final BackendRegistry backendRegistry;

private final ClusterInfoHolder clusterInfoHolder;

public SecurityFilter(final Client client, final Settings settings, final PrivilegesEvaluator evalp, final AdminDNs adminDns,
DlsFlsRequestValve dlsFlsValve, AuditLog auditLog, ThreadPool threadPool, ClusterService cs,
final CompatConfig compatConfig, final IndexResolverReplacer indexResolverReplacer, BackendRegistry backendRegistry) {
final CompatConfig compatConfig, final IndexResolverReplacer indexResolverReplacer, BackendRegistry backendRegistry, ClusterInfoHolder cih) {
this.client = client;
this.evalp = evalp;
this.adminDns = adminDns;
Expand All @@ -133,6 +140,7 @@ public SecurityFilter(final Client client, final Settings settings, final Privil
this.immutableIndicesMatcher = WildcardMatcher.from(settings.getAsList(ConfigConstants.SECURITY_COMPLIANCE_IMMUTABLE_INDICES, Collections.emptyList()));
this.rolesInjector = new RolesInjector(auditLog);
this.backendRegistry = backendRegistry;
this.clusterInfoHolder = cih;
log.info("{} indices are made immutable.", immutableIndicesMatcher);
}

Expand Down Expand Up @@ -386,15 +394,18 @@ private static boolean isUserAdmin(User user, final AdminDNs adminDns) {
}

private void attachSourceFieldContext(ActionRequest request) {

Boolean hasOdfeNodes = clusterInfoHolder.getHasOdfeNodes();
PackageBehavior packageBehavior = (hasOdfeNodes == null || hasOdfeNodes) ? PackageBehavior.REWRITE_AS_ODFE : PackageBehavior.NONE;

if(request instanceof SearchRequest && SourceFieldsContext.isNeeded((SearchRequest) request)) {
if(threadContext.getHeader("_opendistro_security_source_field_context") == null) {
final String serializedSourceFieldContext = Base64Helper.serializeObject(new SourceFieldsContext((SearchRequest) request));
final String serializedSourceFieldContext = Base64Helper.serializeObject(new SourceFieldsContext((SearchRequest) request), packageBehavior);
threadContext.putHeader("_opendistro_security_source_field_context", serializedSourceFieldContext);
}
} else if (request instanceof GetRequest && SourceFieldsContext.isNeeded((GetRequest) request)) {
if(threadContext.getHeader("_opendistro_security_source_field_context") == null) {
final String serializedSourceFieldContext = Base64Helper.serializeObject(new SourceFieldsContext((GetRequest) request));
final String serializedSourceFieldContext = Base64Helper.serializeObject(new SourceFieldsContext((GetRequest) request), packageBehavior);
threadContext.putHeader("_opendistro_security_source_field_context", serializedSourceFieldContext);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.security.configuration.ClusterInfoHolder;
import org.opensearch.security.securityconf.SecurityRoles;
import org.opensearch.threadpool.ThreadPool;

import org.opensearch.security.resolver.IndexResolverReplacer.Resolved;
import org.opensearch.security.support.Base64Helper;
import org.opensearch.security.support.Base64Helper.PackageBehavior;
import org.opensearch.security.support.ConfigConstants;
import org.opensearch.security.support.WildcardMatcher;
import org.opensearch.security.support.HeaderHelper;
Expand All @@ -67,18 +69,21 @@ public DlsFlsEvaluator(Settings settings, ThreadPool threadPool) {
}

public PrivilegesEvaluatorResponse evaluate(final ActionRequest request, final ClusterService clusterService, final IndexNameExpressionResolver resolver, final Resolved requestedResolved, final User user,
final SecurityRoles securityRoles, final PrivilegesEvaluatorResponse presponse) {
final SecurityRoles securityRoles, final PrivilegesEvaluatorResponse presponse, final ClusterInfoHolder clusterInfoHolder) {

ThreadContext threadContext = threadPool.getThreadContext();

Boolean hasOdfeNodes = clusterInfoHolder.getHasOdfeNodes();
PackageBehavior packageBehavior = (hasOdfeNodes == null || hasOdfeNodes) ? PackageBehavior.REWRITE_AS_ODFE : PackageBehavior.NONE;

// maskedFields
final Map<String, Set<String>> maskedFieldsMap = securityRoles.getMaskedFields(user, resolver, clusterService);
final boolean isDebugEnabled = log.isDebugEnabled();

if (maskedFieldsMap != null && !maskedFieldsMap.isEmpty()) {

if(request instanceof ClusterSearchShardsRequest && HeaderHelper.isTrustedClusterRequest(threadContext)) {
threadContext.addResponseHeader(ConfigConstants.OPENDISTRO_SECURITY_MASKED_FIELD_HEADER, Base64Helper.serializeObject((Serializable) maskedFieldsMap));
threadContext.addResponseHeader(ConfigConstants.OPENDISTRO_SECURITY_MASKED_FIELD_HEADER, Base64Helper.serializeObject((Serializable) maskedFieldsMap, packageBehavior));
if (isDebugEnabled) {
log.debug("Added response header for masked fields info: {}", maskedFieldsMap);
}
Expand All @@ -92,7 +97,7 @@ public PrivilegesEvaluatorResponse evaluate(final ActionRequest request, final C
}
}
} else {
threadContext.putHeader(ConfigConstants.OPENDISTRO_SECURITY_MASKED_FIELD_HEADER, Base64Helper.serializeObject((Serializable) maskedFieldsMap));
threadContext.putHeader(ConfigConstants.OPENDISTRO_SECURITY_MASKED_FIELD_HEADER, Base64Helper.serializeObject((Serializable) maskedFieldsMap, packageBehavior));
if (isDebugEnabled) {
log.debug("Attach masked fields info: {}", maskedFieldsMap);
}
Expand All @@ -116,7 +121,7 @@ public PrivilegesEvaluatorResponse evaluate(final ActionRequest request, final C
if (!dlsQueries.isEmpty()) {

if(request instanceof ClusterSearchShardsRequest && HeaderHelper.isTrustedClusterRequest(threadContext)) {
threadContext.addResponseHeader(ConfigConstants.OPENDISTRO_SECURITY_DLS_QUERY_HEADER, Base64Helper.serializeObject((Serializable) dlsQueries));
threadContext.addResponseHeader(ConfigConstants.OPENDISTRO_SECURITY_DLS_QUERY_HEADER, Base64Helper.serializeObject((Serializable) dlsQueries, packageBehavior));
if (isDebugEnabled) {
log.debug("Added response header for DLS info: {}", dlsQueries);
}
Expand All @@ -126,7 +131,7 @@ public PrivilegesEvaluatorResponse evaluate(final ActionRequest request, final C
throw new OpenSearchSecurityException(ConfigConstants.OPENDISTRO_SECURITY_DLS_QUERY_HEADER + " does not match (SG 900D)");
}
} else {
threadContext.putHeader(ConfigConstants.OPENDISTRO_SECURITY_DLS_QUERY_HEADER, Base64Helper.serializeObject((Serializable) dlsQueries));
threadContext.putHeader(ConfigConstants.OPENDISTRO_SECURITY_DLS_QUERY_HEADER, Base64Helper.serializeObject((Serializable) dlsQueries, packageBehavior));
if (isDebugEnabled) {
log.debug("Attach DLS info: {}", dlsQueries);
}
Expand All @@ -143,7 +148,7 @@ public PrivilegesEvaluatorResponse evaluate(final ActionRequest request, final C
if (!flsFields.isEmpty()) {

if(request instanceof ClusterSearchShardsRequest && HeaderHelper.isTrustedClusterRequest(threadContext)) {
threadContext.addResponseHeader(ConfigConstants.OPENDISTRO_SECURITY_FLS_FIELDS_HEADER, Base64Helper.serializeObject((Serializable) flsFields));
threadContext.addResponseHeader(ConfigConstants.OPENDISTRO_SECURITY_FLS_FIELDS_HEADER, Base64Helper.serializeObject((Serializable) flsFields, packageBehavior));
if (isDebugEnabled) {
log.debug("Added response header for FLS info: {}", flsFields);
}
Expand All @@ -157,7 +162,7 @@ public PrivilegesEvaluatorResponse evaluate(final ActionRequest request, final C
}
}
} else {
threadContext.putHeader(ConfigConstants.OPENDISTRO_SECURITY_FLS_FIELDS_HEADER, Base64Helper.serializeObject((Serializable) flsFields));
threadContext.putHeader(ConfigConstants.OPENDISTRO_SECURITY_FLS_FIELDS_HEADER, Base64Helper.serializeObject((Serializable) flsFields, packageBehavior));
if (isDebugEnabled) {
log.debug("Attach FLS info: {}", flsFields);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ public PrivilegesEvaluatorResponse evaluate(final User user, String action0, fin
// check dlsfls
if (dlsFlsEnabled
//&& (action0.startsWith("indices:data/read") || action0.equals(ClusterSearchShardsAction.NAME))
&& dlsFlsEvaluator.evaluate(request, clusterService, resolver, requestedResolved, user, securityRoles, presponse).isComplete()) {
&& dlsFlsEvaluator.evaluate(request, clusterService, resolver, requestedResolved, user, securityRoles, presponse, clusterInfoHolder).isComplete()) {
return presponse;
}

Expand Down
Loading

0 comments on commit 00e40e4

Please sign in to comment.