Skip to content

Commit

Permalink
Fix downsample failure when FLS/DLS is enabled. (elastic#98587)
Browse files Browse the repository at this point in the history
If FLS/DLS is enabled (this is the case when trial/licence is active and
security is enabled) then invoking the downsample API results in
immediate failure.

The downsample shard persistent task executor opens a searcher, but
security didn't set indices permissions in the thread local (this
happens via SecurityActionFilter). This will only happen on indices
actions (which are actions with a request that implement IndicesRequest.
This change does this by delegating to a transport action that executes
always locally, and this way security prepares thread local headers
correctly.

This adds another layer of indirection, but without doing this FLS/DLS
wouldn't work.

Closes elastic#98569

(Marking as a non-issue, because this a bug in not released code)
  • Loading branch information
martijnvg authored and csoulios committed Aug 18, 2023
1 parent 0113fb2 commit 2de7db9
Show file tree
Hide file tree
Showing 8 changed files with 615 additions and 157 deletions.
34 changes: 34 additions & 0 deletions x-pack/plugin/downsample/qa/with-security/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import org.elasticsearch.gradle.Version
import org.elasticsearch.gradle.internal.info.BuildParams

apply plugin: 'elasticsearch.legacy-yaml-rest-test'
apply plugin: 'elasticsearch.legacy-yaml-rest-compat-test'

dependencies {
yamlRestTestImplementation project(path: xpackModule('rollup'))
}

restResources {
restApi {
include '_common', 'bulk', 'cluster', 'indices', 'search', 'ingest.put_pipeline', 'ingest.delete_pipeline'
}
}

testClusters.configureEach {
testDistribution = 'DEFAULT'
setting 'xpack.license.self_generated.type', 'trial'
setting 'xpack.security.enabled', 'true'
user username: 'elastic_admin', password: 'admin-password'
}

if (BuildParams.inFipsJvm){
// This test cluster is using a BASIC license and FIPS 140 mode is not supported in BASIC
tasks.named("yamlRestTest").configure{enabled = false }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.downsample;

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase;

public class DownsampleRestIT extends ESClientYamlSuiteTestCase {

public DownsampleRestIT(final ClientYamlTestCandidate testCandidate) {
super(testCandidate);
}

@Override
protected Settings restClientSettings() {
String authentication = basicAuthHeaderValue("elastic_admin", new SecureString("admin-password".toCharArray()));
return Settings.builder().put(super.restClientSettings()).put(ThreadContext.PREFIX + ".Authorization", authentication).build();
}

@ParametersFactory
public static Iterable<Object[]> parameters() throws Exception {
return ESClientYamlSuiteTestCase.createParameters();
}

}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
Expand All @@ -22,31 +21,23 @@
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.persistent.PersistentTaskParams;
import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.PersistentTaskPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.tracing.Tracer;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xpack.core.downsample.DownsampleIndexerAction;
import org.elasticsearch.xpack.core.downsample.DownsampleShardPersistentTaskState;
import org.elasticsearch.xpack.core.downsample.DownsampleShardTask;

import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;

Expand All @@ -55,8 +46,6 @@ public class Downsample extends Plugin implements ActionPlugin, PersistentTaskPl
public static final String DOWSAMPLE_TASK_THREAD_POOL_NAME = "downsample_indexing";
private static final int DOWNSAMPLE_TASK_THREAD_POOL_QUEUE_SIZE = 256;

private IndicesService indicesService;

@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
final FixedExecutorBuilder downsample = new FixedExecutorBuilder(
Expand All @@ -74,7 +63,11 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return List.of(
new ActionHandler<>(DownsampleIndexerAction.INSTANCE, TransportDownsampleIndexerAction.class),
new ActionHandler<>(DownsampleAction.INSTANCE, TransportDownsampleAction.class)
new ActionHandler<>(DownsampleAction.INSTANCE, TransportDownsampleAction.class),
new ActionHandler<>(
DownsampleShardPersistentTaskExecutor.DelegatingAction.INSTANCE,
DownsampleShardPersistentTaskExecutor.DelegatingAction.TA.class
)
);
}

Expand All @@ -99,14 +92,7 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(
SettingsModule settingsModule,
IndexNameExpressionResolver expressionResolver
) {
return List.of(
new DownsampleShardPersistentTaskExecutor(
client,
this.indicesService,
DownsampleShardTask.TASK_NAME,
DOWSAMPLE_TASK_THREAD_POOL_NAME
)
);
return List.of(new DownsampleShardPersistentTaskExecutor(client, DownsampleShardTask.TASK_NAME, DOWSAMPLE_TASK_THREAD_POOL_NAME));
}

@Override
Expand All @@ -133,49 +119,7 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
DownsampleShardPersistentTaskState.NAME,
DownsampleShardPersistentTaskState::readFromStream
),
new NamedWriteableRegistry.Entry(
PersistentTaskParams.class,
DownsampleShardTaskParams.NAME,
DownsampleShardTaskParams::readFromStream
)
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, DownsampleShardTaskParams.NAME, DownsampleShardTaskParams::new)
);
}

@Override
public Collection<Object> createComponents(
final Client client,
final ClusterService clusterService,
final ThreadPool threadPool,
final ResourceWatcherService resourceWatcherService,
final ScriptService scriptService,
final NamedXContentRegistry xContentRegistry,
final Environment environment,
final NodeEnvironment nodeEnvironment,
final NamedWriteableRegistry namedWriteableRegistry,
final IndexNameExpressionResolver indexNameExpressionResolver,
final Supplier<RepositoriesService> repositoriesServiceSupplier,
final Tracer tracer,
final AllocationService allocationService,
final IndicesService indicesService
) {
final Collection<Object> components = super.createComponents(
client,
clusterService,
threadPool,
resourceWatcherService,
scriptService,
xContentRegistry,
environment,
nodeEnvironment,
namedWriteableRegistry,
indexNameExpressionResolver,
repositoriesServiceSupplier,
tracer,
allocationService,
indicesService
);

this.indicesService = indicesService;
return components;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,6 @@ public DownsampleIndexerAction.ShardDownsampleResponse execute() throws IOExcept

if (task.getNumIndexed() != task.getNumSent()) {
task.setDownsampleShardIndexerStatus(DownsampleShardIndexerStatus.FAILED);
task.updatePersistentTaskState(
new DownsampleShardPersistentTaskState(DownsampleShardIndexerStatus.FAILED, null),
ActionListener.noop()
);
final String error = "Downsampling task ["
+ task.getPersistentTaskId()
+ "] on shard "
Expand All @@ -199,10 +195,6 @@ public DownsampleIndexerAction.ShardDownsampleResponse execute() throws IOExcept
+ task.getNumFailed()
+ "]";
logger.info(error);
task.updatePersistentTaskState(
new DownsampleShardPersistentTaskState(DownsampleShardIndexerStatus.FAILED, null),
ActionListener.noop()
);
throw new DownsampleShardIndexerException(error, false);
}

Expand Down
Loading

0 comments on commit 2de7db9

Please sign in to comment.