From 239679bea366fbc0a4c4e1a725f769c1f4591948 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 8 Jul 2021 09:12:58 +0200 Subject: [PATCH] Fix TransportFieldCapabilitiesAction Blocking Transport Thread (#75022) (#75081) Running a request per index could take a very long time here if the request covers a larger number of shards (especially when security is enabled). Forking it to the management pool saves the transport thread from getting blocked. Also, to make this request run quicker (again especially with security enabled) I removed the redundant index-level request fan-out here as well to save one step of redundant request handling and authorization (the shard level request is authorized separately again anyway). In a follow-up to 8.x because of 7.x BwC issues, we can refactor away the redundant index-level fan out as well. --- .../TransportFieldCapabilitiesAction.java | 81 ++++++++++--------- ...TransportFieldCapabilitiesIndexAction.java | 15 ++-- .../AbstractEqlBlockingIntegTestCase.java | 12 +-- .../AbstractSqlBlockingIntegTestCase.java | 12 +-- 4 files changed, 54 insertions(+), 66 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java index 81572b9e164ab..b694551743444 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java @@ -16,7 +16,6 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.client.Client; -import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; @@ -28,7 +27,6 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterAware; -import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.transport.TransportService; import java.util.ArrayList; @@ -43,9 +41,8 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction { private final ThreadPool threadPool; + private final TransportService transportService; private final ClusterService clusterService; - private final TransportFieldCapabilitiesIndexAction shardAction; - private final RemoteClusterService remoteClusterService; private final IndexNameExpressionResolver indexNameExpressionResolver; private final Predicate metadataFieldPred; @@ -53,16 +50,13 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction metadataFields = indicesService.getAllMetadataFields(); this.metadataFieldPred = metadataFields::contains; @@ -73,8 +67,9 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti // retrieve the initial timestamp in case the action is a cross cluster search long nowInMillis = request.nowInMillis() == null ? System.currentTimeMillis() : request.nowInMillis(); final ClusterState clusterState = clusterService.state(); - final Map remoteClusterIndices = remoteClusterService.groupIndices(request.indicesOptions(), - request.indices(), idx -> indexNameExpressionResolver.hasIndexAbstraction(idx, clusterState)); + final Map remoteClusterIndices = + transportService.getRemoteClusterService().groupIndices(request.indicesOptions(), + request.indices(), idx -> indexNameExpressionResolver.hasIndexAbstraction(idx, clusterState)); final OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); final String[] concreteIndices; if (localIndices == null) { @@ -119,32 +114,41 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti } }; - for (String index : concreteIndices) { - shardAction.execute( - new FieldCapabilitiesIndexRequest( - request.fields(), - index, - localIndices, - request.indexFilter(), - nowInMillis, - request.runtimeFields() - ), - new ActionListener() { - @Override - public void onResponse(FieldCapabilitiesIndexResponse result) { - if (result.canMatch()) { - indexResponses.add(result); - } - countDown.run(); - } + if (concreteIndices.length > 0) { + // fork this action to the management pool as it can fan out to a large number of child requests that get handled on SAME and + // thus would all run on the current transport thread and block it for an unacceptable amount of time + // (particularly with security enabled) + threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(ActionRunnable.wrap(listener, l -> { + for (String index : concreteIndices) { + new TransportFieldCapabilitiesIndexAction.AsyncShardsAction( + transportService, + clusterService, + new FieldCapabilitiesIndexRequest( + request.fields(), + index, + localIndices, + request.indexFilter(), + nowInMillis, + request.runtimeFields() + ), + new ActionListener() { + @Override + public void onResponse(FieldCapabilitiesIndexResponse result) { + if (result.canMatch()) { + indexResponses.add(result); + } + countDown.run(); + } - @Override - public void onFailure(Exception e) { - indexFailures.collect(e, index); - countDown.run(); - } + @Override + public void onFailure(Exception e) { + indexFailures.collect(e, index); + countDown.run(); + } + } + ).start(); } - ); + })); } // this is the cross cluster part of this API - we force the other cluster to not merge the results but instead @@ -152,7 +156,7 @@ public void onFailure(Exception e) { for (Map.Entry remoteIndices : remoteClusterIndices.entrySet()) { String clusterAlias = remoteIndices.getKey(); OriginalIndices originalIndices = remoteIndices.getValue(); - Client remoteClusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias); + Client remoteClusterClient = transportService.getRemoteClusterService().getRemoteClusterClient(threadPool, clusterAlias); FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest(); remoteRequest.setMergeResults(false); // we need to merge on this node remoteRequest.indicesOptions(originalIndices.indicesOptions()); @@ -212,9 +216,8 @@ private FieldCapabilitiesResponse merge( } private void addUnmappedFields(String[] indices, String field, Map typeMap) { - Set unmappedIndices = new HashSet<>(); - Arrays.stream(indices).forEach(unmappedIndices::add); - typeMap.values().stream().forEach((b) -> b.getIndices().stream().forEach(unmappedIndices::remove)); + Set unmappedIndices = new HashSet<>(Arrays.asList(indices)); + typeMap.values().forEach((b) -> b.getIndices().forEach(unmappedIndices::remove)); if (unmappedIndices.isEmpty() == false) { FieldCapabilities.Builder unmapped = new FieldCapabilities.Builder(field, "unmapped"); typeMap.put("unmapped", unmapped); @@ -239,7 +242,7 @@ private void innerMerge(Map> resp } } - private class FailureCollector { + private static final class FailureCollector { final Map, FieldCapabilitiesFailure> indexFailures = Collections.synchronizedMap( new HashMap<>() ); diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java index adf8081ddfeb9..1075fe34813a9 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java @@ -91,7 +91,7 @@ public TransportFieldCapabilitiesIndexAction(ClusterService clusterService, @Override protected void doExecute(Task task, FieldCapabilitiesIndexRequest request, ActionListener listener) { - new AsyncShardsAction(request, listener).start(); + new AsyncShardsAction(transportService, clusterService, request, listener).start(); } private FieldCapabilitiesIndexResponse shardOperation(final FieldCapabilitiesIndexRequest request) throws IOException { @@ -163,11 +163,11 @@ private boolean canMatchShard(FieldCapabilitiesIndexRequest req, SearchExecution return SearchService.queryStillMatchesAfterRewrite(searchRequest, searchExecutionContext); } - private ClusterBlockException checkGlobalBlock(ClusterState state) { + private static ClusterBlockException checkGlobalBlock(ClusterState state) { return state.blocks().globalBlockedException(ClusterBlockLevel.READ); } - private ClusterBlockException checkRequestBlock(ClusterState state, String concreteIndex) { + private static ClusterBlockException checkRequestBlock(ClusterState state, String concreteIndex) { return state.blocks().indexBlockedException(ClusterBlockLevel.READ, concreteIndex); } @@ -176,16 +176,21 @@ private ClusterBlockException checkRequestBlock(ClusterState state, String concr * {@link FieldCapabilitiesIndexRequest#indexFilter()}. In which case the shard is used * to create the final {@link FieldCapabilitiesIndexResponse}. */ - class AsyncShardsAction { + public static class AsyncShardsAction { private final FieldCapabilitiesIndexRequest request; + private final TransportService transportService; private final DiscoveryNodes nodes; private final ActionListener listener; private final GroupShardsIterator shardsIt; private volatile int shardIndex = 0; - private AsyncShardsAction(FieldCapabilitiesIndexRequest request, ActionListener listener) { + public AsyncShardsAction(TransportService transportService, + ClusterService clusterService, + FieldCapabilitiesIndexRequest request, + ActionListener listener) { this.listener = listener; + this.transportService = transportService; ClusterState clusterState = clusterService.state(); if (logger.isTraceEnabled()) { diff --git a/x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/AbstractEqlBlockingIntegTestCase.java b/x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/AbstractEqlBlockingIntegTestCase.java index 05d05d3ee1cc8..e97bf4f81ea8c 100644 --- a/x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/AbstractEqlBlockingIntegTestCase.java +++ b/x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/AbstractEqlBlockingIntegTestCase.java @@ -209,18 +209,8 @@ public void app } logger.trace("unblocking field caps on " + nodeId); }; - final Thread originalThread = Thread.currentThread(); chain.proceed(task, action, request, - ActionListener.wrap( - resp -> { - if (originalThread == Thread.currentThread()) { - // async if we never exited the original thread - executorService.execute(() -> actionWrapper.accept(resp)); - } else { - actionWrapper.accept(resp); - } - }, - listener::onFailure) + ActionListener.wrap(resp -> executorService.execute(() -> actionWrapper.accept(resp)), listener::onFailure) ); } else { chain.proceed(task, action, request, listener); diff --git a/x-pack/plugin/sql/src/internalClusterTest/java/org/elasticsearch/xpack/sql/action/AbstractSqlBlockingIntegTestCase.java b/x-pack/plugin/sql/src/internalClusterTest/java/org/elasticsearch/xpack/sql/action/AbstractSqlBlockingIntegTestCase.java index 63ab35d775e52..ac3dc8a7349cf 100644 --- a/x-pack/plugin/sql/src/internalClusterTest/java/org/elasticsearch/xpack/sql/action/AbstractSqlBlockingIntegTestCase.java +++ b/x-pack/plugin/sql/src/internalClusterTest/java/org/elasticsearch/xpack/sql/action/AbstractSqlBlockingIntegTestCase.java @@ -227,18 +227,8 @@ public void app } logger.trace("unblocking field caps on " + nodeId); }; - final Thread originalThread = Thread.currentThread(); chain.proceed(task, action, request, - ActionListener.wrap( - resp -> { - if (originalThread == Thread.currentThread()) { - // async if we never exited the original thread - executorService.execute(() -> actionWrapper.accept(resp)); - } else { - actionWrapper.accept(resp); - } - }, - listener::onFailure) + ActionListener.wrap(resp -> executorService.execute(() -> actionWrapper.accept(resp)), listener::onFailure) ); } else { chain.proceed(task, action, request, listener);