Skip to content

Commit

Permalink
Don't hold onto ClusterState reference in AbstractSearchAsyncAction (#…
Browse files Browse the repository at this point in the history
…100901)

If the cluster state is changing quickly while searches are starting
then these  captured cluster states can consume substantial memory, and
we are only  interested in two values here.  This commit extracts the
two relevant values in the constructor, removing the cluster state
references entirely.

Closes #100120
  • Loading branch information
romseygeek authored Oct 16, 2023
1 parent a60a789 commit 31736fc
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final BiFunction<String, String, Transport.Connection> nodeIdToConnection;
private final SearchTask task;
protected final SearchPhaseResults<Result> results;
private final ClusterState clusterState;
private final long clusterStateVersion;
private final TransportVersion minTransportVersion;
private final Map<String, AliasFilter> aliasFilter;
private final Map<String, Float> concreteIndexBoosts;
private final SetOnce<AtomicArray<ShardSearchFailure>> shardFailures = new SetOnce<>();
Expand Down Expand Up @@ -161,8 +162,9 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
this.task = task;
this.listener = ActionListener.runAfter(listener, this::releaseContext);
this.nodeIdToConnection = nodeIdToConnection;
this.clusterState = clusterState;
this.concreteIndexBoosts = concreteIndexBoosts;
this.clusterStateVersion = clusterState.version();
this.minTransportVersion = clusterState.getMinTransportVersion();
this.aliasFilter = aliasFilter;
this.results = resultConsumer;
this.clusters = clusters;
Expand Down Expand Up @@ -459,7 +461,7 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha
currentPhase.getName(),
nextPhase.getName(),
resultsFrom,
clusterState.version()
clusterStateVersion
);
}
executePhase(nextPhase);
Expand Down Expand Up @@ -709,7 +711,6 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At
if (allowPartialResults == false && failures.length > 0) {
raisePhaseFailure(new SearchPhaseExecutionException("", "Shard failures", null, failures));
} else {
final TransportVersion minTransportVersion = clusterState.getMinTransportVersion();
final String scrollId = request.scroll() != null ? TransportSearchHelper.buildScrollId(queryResults) : null;
final String searchContextId;
if (buildPointInTimeFromSearchResults()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
Expand Down Expand Up @@ -192,7 +194,7 @@ public void sendExecuteQuery(
null,
shardsIter,
timeProvider,
null,
new ClusterState.Builder(new ClusterName("test")).build(),
task,
SearchResponse.Clusters.EMPTY
) {
Expand Down Expand Up @@ -324,7 +326,7 @@ private void testMixedVersionsShardsSearch(Version oldVersion, Version newVersio
EsExecutors.DIRECT_EXECUTOR_SERVICE,
resultConsumer,
searchRequest,
new ActionListener<SearchResponse>() {
new ActionListener<>() {
@Override
public void onFailure(Exception e) {
responses.add(e);
Expand All @@ -336,7 +338,7 @@ public void onResponse(SearchResponse response) {
},
shardsIter,
timeProvider,
null,
new ClusterState.Builder(new ClusterName("test")).build(),
task,
SearchResponse.Clusters.EMPTY
);
Expand Down Expand Up @@ -474,7 +476,7 @@ public void sendExecuteQuery(
null,
shardsIter,
timeProvider,
null,
new ClusterState.Builder(new ClusterName("test")).build(),
task,
SearchResponse.Clusters.EMPTY
) {
Expand Down Expand Up @@ -622,7 +624,7 @@ public void sendExecuteQuery(
null,
shardsIter,
timeProvider,
null,
new ClusterState.Builder(new ClusterName("test")).build(),
task,
SearchResponse.Clusters.EMPTY
) {
Expand Down

0 comments on commit 31736fc

Please sign in to comment.