Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove dangerous default executor from HandledTransportAction #100162

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
Expand All @@ -30,7 +31,7 @@ public class TransportNoopBulkAction extends HandledTransportAction<BulkRequest,

@Inject
public TransportNoopBulkAction(TransportService transportService, ActionFilters actionFilters) {
super(NoopBulkAction.NAME, transportService, actionFilters, BulkRequest::new);
super(NoopBulkAction.NAME, transportService, actionFilters, BulkRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.InternalAggregations;
Expand All @@ -30,7 +31,13 @@
public class TransportNoopSearchAction extends HandledTransportAction<SearchRequest, SearchResponse> {
@Inject
public TransportNoopSearchAction(TransportService transportService, ActionFilters actionFilters) {
super(NoopSearchAction.NAME, transportService, actionFilters, (Writeable.Reader<SearchRequest>) SearchRequest::new);
super(
NoopSearchAction.NAME,
transportService,
actionFilters,
(Writeable.Reader<SearchRequest>) SearchRequest::new,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.grok.GrokBuiltinPatterns;
import org.elasticsearch.grok.PatternBank;
import org.elasticsearch.rest.BaseRestHandler;
Expand Down Expand Up @@ -139,7 +140,7 @@ public TransportAction(TransportService transportService, ActionFilters actionFi
PatternBank legacyGrokPatterns,
PatternBank ecsV1GrokPatterns
) {
super(NAME, transportService, actionFilters, Request::new);
super(NAME, transportService, actionFilters, Request::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.legacyGrokPatterns = legacyGrokPatterns.bank();
this.sortedLegacyGrokPatterns = new TreeMap<>(this.legacyGrokPatterns);
this.ecsV1GrokPatterns = ecsV1GrokPatterns.bank();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -44,7 +45,13 @@ public TransportMultiSearchTemplateAction(
NodeClient client,
UsageService usageService
) {
super(MultiSearchTemplateAction.NAME, transportService, actionFilters, MultiSearchTemplateRequest::new);
super(
MultiSearchTemplateAction.NAME,
transportService,
actionFilters,
MultiSearchTemplateRequest::new,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
this.scriptService = scriptService;
this.xContentRegistry = xContentRegistry;
this.client = client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.rest.action.search.RestSearchAction;
import org.elasticsearch.script.Script;
Expand Down Expand Up @@ -54,7 +55,7 @@ public TransportSearchTemplateAction(
NodeClient client,
UsageService usageService
) {
super(SearchTemplateAction.NAME, transportService, actionFilters, SearchTemplateRequest::new);
super(SearchTemplateAction.NAME, transportService, actionFilters, SearchTemplateRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.scriptService = scriptService;
this.xContentRegistry = xContentRegistry;
this.client = client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.painless.PainlessScriptEngine;
import org.elasticsearch.painless.lookup.PainlessLookup;
import org.elasticsearch.rest.BaseRestHandler;
Expand Down Expand Up @@ -142,7 +143,7 @@ public static class TransportAction extends HandledTransportAction<Request, Resp

@Inject
public TransportAction(TransportService transportService, ActionFilters actionFilters, PainlessScriptEngine painlessScriptEngine) {
super(NAME, transportService, actionFilters, (Writeable.Reader<Request>) Request::new);
super(NAME, transportService, actionFilters, (Writeable.Reader<Request>) Request::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.painlessScriptEngine = painlessScriptEngine;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptService;
Expand Down Expand Up @@ -69,7 +70,7 @@ public TransportRankEvalAction(
ScriptService scriptService,
NamedXContentRegistry namedXContentRegistry
) {
super(RankEvalAction.NAME, transportService, actionFilters, RankEvalRequest::new);
super(RankEvalAction.NAME, transportService, actionFilters, RankEvalRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.scriptService = scriptService;
this.namedXContentRegistry = namedXContentRegistry;
this.client = client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.client.internal.ParentTaskAssigningClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.BulkByScrollTask;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
Expand All @@ -40,7 +41,7 @@ public TransportDeleteByQueryAction(
ScriptService scriptService,
ClusterService clusterService
) {
super(DeleteByQueryAction.NAME, transportService, actionFilters, DeleteByQueryRequest::new);
super(DeleteByQueryAction.NAME, transportService, actionFilters, DeleteByQueryRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.threadPool = threadPool;
this.client = client;
this.scriptService = scriptService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.BulkByScrollTask;
import org.elasticsearch.index.reindex.ReindexAction;
Expand Down Expand Up @@ -82,7 +83,7 @@ protected TransportReindexAction(
TransportService transportService,
ReindexSslConfig sslConfig
) {
super(name, transportService, actionFilters, ReindexRequest::new);
super(name, transportService, actionFilters, ReindexRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.client = client;
this.reindexValidator = new ReindexValidator(settings, clusterService, indexNameExpressionResolver, autoCreateIndex);
this.reindexer = new Reindexer(clusterService, client, threadPool, scriptService, sslConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.BulkByScrollTask;
import org.elasticsearch.index.reindex.ScrollableHitSource;
Expand Down Expand Up @@ -53,7 +54,7 @@ public TransportUpdateByQueryAction(
ScriptService scriptService,
ClusterService clusterService
) {
super(UpdateByQueryAction.NAME, transportService, actionFilters, UpdateByQueryRequest::new);
super(UpdateByQueryAction.NAME, transportService, actionFilters, UpdateByQueryRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.threadPool = threadPool;
this.client = client;
this.scriptService = scriptService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.node.Node;
import org.elasticsearch.tasks.Task;
Expand All @@ -33,7 +34,7 @@ public TransportMainAction(
ActionFilters actionFilters,
ClusterService clusterService
) {
super(MainAction.NAME, transportService, actionFilters, MainRequest::new);
super(MainAction.NAME, transportService, actionFilters, MainRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.nodeName = Node.NODE_NAME_SETTING.get(settings);
this.clusterService = clusterService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
Expand Down Expand Up @@ -185,7 +186,7 @@ public TestTransportAction(
PluginsService pluginsService,
ThreadPool threadPool
) {
super(NAME, transportService, actionFilters, in -> new TestRequest());
super(NAME, transportService, actionFilters, in -> new TestRequest(), EsExecutors.DIRECT_EXECUTOR_SERVICE);
testPlugin = pluginsService.filterPlugins(TestPlugin.class).get(0);
this.threadPool = threadPool;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;

Expand Down Expand Up @@ -129,7 +130,7 @@ public static class TransportAction extends HandledTransportAction<Request, Resp

@Inject
public TransportAction(TransportService transportService, ActionFilters actionFilters, MasterHistoryService masterHistoryService) {
super(MasterHistoryAction.NAME, transportService, actionFilters, MasterHistoryAction.Request::new);
super(MasterHistoryAction.NAME, transportService, actionFilters, Request::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.masterHistoryService = masterHistoryService;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public TransportGetTaskAction(
Client client,
NamedXContentRegistry xContentRegistry
) {
super(GetTaskAction.NAME, transportService, actionFilters, GetTaskRequest::new);
super(GetTaskAction.NAME, transportService, actionFilters, GetTaskRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.transportService = transportService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.RemoteClusterServerInfo;
Expand Down Expand Up @@ -98,7 +99,7 @@ public static class TransportAction extends HandledTransportAction<Request, Resp

@Inject
public TransportAction(TransportService transportService, ActionFilters actionFilters) {
super(RemoteClusterNodesAction.NAME, transportService, actionFilters, Request::new);
super(RemoteClusterNodesAction.NAME, transportService, actionFilters, Request::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.transportService = transportService;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.TransportService;
Expand All @@ -30,7 +31,7 @@ public TransportRemoteInfoAction(
ActionFilters actionFilters,
SearchTransportService searchTransportService
) {
super(RemoteInfoAction.NAME, transportService, actionFilters, RemoteInfoRequest::new);
super(RemoteInfoAction.NAME, transportService, actionFilters, RemoteInfoRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.remoteClusterService = searchTransportService.getRemoteClusterService();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.script.ScriptContextInfo;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.tasks.Task;
Expand All @@ -24,7 +25,13 @@ public class TransportGetScriptContextAction extends HandledTransportAction<GetS

@Inject
public TransportGetScriptContextAction(TransportService transportService, ActionFilters actionFilters, ScriptService scriptService) {
super(GetScriptContextAction.NAME, transportService, actionFilters, GetScriptContextRequest::new);
super(
GetScriptContextAction.NAME,
transportService,
actionFilters,
GetScriptContextRequest::new,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
this.scriptService = scriptService;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
Expand All @@ -21,7 +22,13 @@ public class TransportGetScriptLanguageAction extends HandledTransportAction<Get

@Inject
public TransportGetScriptLanguageAction(TransportService transportService, ActionFilters actionFilters, ScriptService scriptService) {
super(GetScriptLanguageAction.NAME, transportService, actionFilters, GetScriptLanguageRequest::new);
super(
GetScriptLanguageAction.NAME,
transportService,
actionFilters,
GetScriptLanguageRequest::new,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
this.scriptService = scriptService;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.gateway.LocalAllocateDangledIndices;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -49,7 +50,13 @@ public TransportImportDanglingIndexAction(
LocalAllocateDangledIndices danglingIndexAllocator,
NodeClient nodeClient
) {
super(ImportDanglingIndexAction.NAME, transportService, actionFilters, ImportDanglingIndexRequest::new);
super(
ImportDanglingIndexAction.NAME,
transportService,
actionFilters,
ImportDanglingIndexRequest::new,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
this.danglingIndexAllocator = danglingIndexAllocator;
this.nodeClient = nodeClient;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;

Expand All @@ -41,7 +42,13 @@ public TransportGetFieldMappingsAction(
IndexNameExpressionResolver indexNameExpressionResolver,
NodeClient client
) {
super(GetFieldMappingsAction.NAME, transportService, actionFilters, GetFieldMappingsRequest::new);
super(
GetFieldMappingsAction.NAME,
transportService,
actionFilters,
GetFieldMappingsRequest::new,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
this.clusterService = clusterService;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.client = client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ public TransportAction(
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
) {
super(NAME, transportService, actionFilters, Request::new);
super(NAME, transportService, actionFilters, Request::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.remoteClusterService = transportService.getRemoteClusterService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;

Expand All @@ -38,7 +39,7 @@ protected TransportSingleItemBulkWriteAction(
Writeable.Reader<Request> requestReader,
TransportBulkAction bulkAction
) {
super(actionName, transportService, actionFilters, requestReader);
super(actionName, transportService, actionFilters, requestReader, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.bulkAction = bulkAction;
}

Expand Down
Loading