From 1dd48664a5c2b26638d062896d712c9217ffc8d3 Mon Sep 17 00:00:00 2001 From: sudarshan baliga Date: Thu, 22 Jun 2023 13:06:28 +0530 Subject: [PATCH] Added transport action for bulk async shard fetch for primary shards Signed-off-by: sudarshan baliga --- CHANGELOG.md | 3 +- .../gateway/AsyncShardsFetchPerNode.java | 32 ++ ...portNodesBulkListGatewayStartedShards.java | 429 ++++++++++++++++++ ...ansportNodesGatewayStartedShardHelper.java | 189 ++++++++ ...ransportNodesListGatewayStartedShards.java | 106 ++--- 5 files changed, 687 insertions(+), 72 deletions(-) create mode 100644 server/src/main/java/org/opensearch/gateway/AsyncShardsFetchPerNode.java create mode 100644 server/src/main/java/org/opensearch/gateway/TransportNodesBulkListGatewayStartedShards.java create mode 100644 server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 75287db6b4f07..05261f2f52d46 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Allow mmap to use new JDK-19 preview APIs in Apache Lucene 9.4+ ([#5151](https://github.com/opensearch-project/OpenSearch/pull/5151)) - Add events correlation engine plugin ([#6854](https://github.com/opensearch-project/OpenSearch/issues/6854)) - Add support for ignoring missing Javadoc on generated code using annotation ([#7604](https://github.com/opensearch-project/OpenSearch/pull/7604)) +- Add PSA transport action for bulk async fetch of shards ([#5098](https://github.com/opensearch-project/OpenSearch/issues/5098)) ### Dependencies - Bump `log4j-core` from 2.18.0 to 2.19.0 @@ -44,7 +45,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Change http code for DecommissioningFailedException from 500 to 400 ([#5283](https://github.com/opensearch-project/OpenSearch/pull/5283)) - Improve summary error message for invalid setting updates ([#4792](https://github.com/opensearch-project/OpenSearch/pull/4792)) - Pass localNode info to all plugins on node start ([#7919](https://github.com/opensearch-project/OpenSearch/pull/7919)) - +- Modified the existing async shard fetch transport action to use the helper functions added for bulk fetching ([#5098](https://github.com/opensearch-project/OpenSearch/issues/5098)) ### Deprecated ### Removed diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardsFetchPerNode.java b/server/src/main/java/org/opensearch/gateway/AsyncShardsFetchPerNode.java new file mode 100644 index 0000000000000..4d06bfeb52f8f --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardsFetchPerNode.java @@ -0,0 +1,32 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.support.nodes.BaseNodeResponse; +import org.opensearch.action.support.nodes.BaseNodesResponse; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.lease.Releasable; +import org.opensearch.index.shard.ShardId; + +import java.util.Map; + +/** + * This class is responsible for fetching shard data from nodes. It is analogous to AsyncShardFetch class since it fetches + * the data in asynchronous manner too. + * @param + */ +public abstract class AsyncShardsFetchPerNode implements Releasable { + /** + * An action that lists the relevant shard data that needs to be fetched. + */ + public interface Lister, NodeResponse extends BaseNodeResponse> { + void list(DiscoveryNode[] nodes, Map shardsIdMap, ActionListener listener); + } +} diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesBulkListGatewayStartedShards.java b/server/src/main/java/org/opensearch/gateway/TransportNodesBulkListGatewayStartedShards.java new file mode 100644 index 0000000000000..8403113dda8c1 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesBulkListGatewayStartedShards.java @@ -0,0 +1,429 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.gateway; + +import org.opensearch.OpenSearchException; +import org.opensearch.Version; +import org.opensearch.action.ActionListener; +import org.opensearch.action.ActionType; +import org.opensearch.action.FailedNodeException; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.transport.TransportRequest; +import org.opensearch.action.support.nodes.BaseNodeResponse; +import org.opensearch.action.support.nodes.BaseNodesRequest; +import org.opensearch.action.support.nodes.BaseNodesResponse; +import org.opensearch.action.support.nodes.TransportNodesAction; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.ShardId; +import org.opensearch.index.shard.ShardStateMetadata; +import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * This transport action is used to fetch all unassigned shard version from each node during primary allocation in {@link GatewayAllocator}. + * We use this to find out which node holds the latest shard version and which of them used to be a primary in order to allocate + * shards after node or cluster restarts. + * + * @opensearch.internal + */ +public class TransportNodesBulkListGatewayStartedShards extends TransportNodesAction< + TransportNodesBulkListGatewayStartedShards.Request, + TransportNodesBulkListGatewayStartedShards.NodesGatewayStartedShards, + TransportNodesBulkListGatewayStartedShards.NodeRequest, + TransportNodesBulkListGatewayStartedShards.BulkOfNodeGatewayStartedShards> + implements + AsyncShardsFetchPerNode.Lister< + TransportNodesBulkListGatewayStartedShards.NodesGatewayStartedShards, + TransportNodesBulkListGatewayStartedShards.BulkOfNodeGatewayStartedShards> { + + public static final String ACTION_NAME = "internal:gateway/local/bulk_started_shards"; + public static final ActionType TYPE = new ActionType<>(ACTION_NAME, NodesGatewayStartedShards::new); + + private final Settings settings; + private final NodeEnvironment nodeEnv; + private final IndicesService indicesService; + private final NamedXContentRegistry namedXContentRegistry; + + @Inject + public TransportNodesBulkListGatewayStartedShards( + Settings settings, + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters, + NodeEnvironment env, + IndicesService indicesService, + NamedXContentRegistry namedXContentRegistry + ) { + super( + ACTION_NAME, + threadPool, + clusterService, + transportService, + actionFilters, + Request::new, + NodeRequest::new, + ThreadPool.Names.FETCH_SHARD_STARTED, + BulkOfNodeGatewayStartedShards.class + ); + this.settings = settings; + this.nodeEnv = env; + this.indicesService = indicesService; + this.namedXContentRegistry = namedXContentRegistry; + } + + @Override + public void list(DiscoveryNode[] nodes, Map shardsIdMap, ActionListener listener) { + execute(new Request(nodes, shardsIdMap), listener); + } + + @Override + protected NodeRequest newNodeRequest(Request request) { + return new NodeRequest(request); + } + + @Override + protected BulkOfNodeGatewayStartedShards newNodeResponse(StreamInput in) throws IOException { + return new BulkOfNodeGatewayStartedShards(in); + } + + @Override + protected NodesGatewayStartedShards newResponse( + Request request, + List responses, + List failures + ) { + return new NodesGatewayStartedShards(clusterService.getClusterName(), responses, failures); + } + + /** + * This function is similar to nodeoperation method of {@link TransportNodesListGatewayStartedShards} we loop over + * the shards here to fetch the shard result in bulk. + * + * @param request + * @return BulkOfNodeGatewayStartedShards + */ + @Override + protected BulkOfNodeGatewayStartedShards nodeOperation(NodeRequest request) { + Map shardsOnNode = new HashMap<>(); + for (Map.Entry shardToCustomDataPathEntry : request.shardIdsWithCustomDataPath.entrySet()) { + try { + final ShardId shardId = shardToCustomDataPathEntry.getKey(); + logger.trace("{} loading local shard state info", shardId); + ShardStateMetadata shardStateMetadata = ShardStateMetadata.FORMAT.loadLatestState( + logger, + namedXContentRegistry, + nodeEnv.availableShardPaths(shardId) + ); + if (shardStateMetadata != null) { + if (indicesService.getShardOrNull(shardId) == null) { + final String customDataPath = TransportNodesGatewayStartedShardHelper.getCustomDataPathForShard( + logger, + shardId, + shardToCustomDataPathEntry.getValue(), + settings, + clusterService + ); + // we don't have an open shard on the store, validate the files on disk are openable + Exception shardCorruptionException = TransportNodesGatewayStartedShardHelper.getShardCorruption( + logger, + nodeEnv, + shardId, + shardStateMetadata, + customDataPath + ); + if (shardCorruptionException != null) { + String allocationId = shardStateMetadata.allocationId != null ? shardStateMetadata.allocationId.getId() : null; + shardsOnNode.put( + shardId, + new NodeGatewayStartedShards(allocationId, shardStateMetadata.primary, null, shardCorruptionException) + ); + continue; + } + } + logger.debug("{} shard state info found: [{}]", shardId, shardStateMetadata); + String allocationId = shardStateMetadata.allocationId != null ? shardStateMetadata.allocationId.getId() : null; + final IndexShard shard = indicesService.getShardOrNull(shardId); + shardsOnNode.put( + shardId, + new NodeGatewayStartedShards( + allocationId, + shardStateMetadata.primary, + shard != null ? shard.getLatestReplicationCheckpoint() : null + ) + ); + } else { + logger.trace("{} no local shard info found", shardId); + shardsOnNode.put(shardId, new NodeGatewayStartedShards(null, false, null)); + } + } catch (Exception e) { + throw new OpenSearchException("failed to load started shards", e); + } + } + return new BulkOfNodeGatewayStartedShards(clusterService.localNode(), shardsOnNode); + } + + /** + * The nodes request. + * + * @opensearch.internal + */ + public static class Request extends BaseNodesRequest { + private final Map shardIdsWithCustomDataPath; + + public Request(StreamInput in) throws IOException { + super(in); + shardIdsWithCustomDataPath = in.readMap(ShardId::new, StreamInput::readString); + } + + public Request(DiscoveryNode[] nodes, Map shardIdStringMap) { + super(nodes); + this.shardIdsWithCustomDataPath = Objects.requireNonNull(shardIdStringMap); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeMap(shardIdsWithCustomDataPath, (o, k) -> k.writeTo(o), StreamOutput::writeString); + } + + public Map getShardIdsMap() { + return shardIdsWithCustomDataPath; + } + } + + /** + * The nodes response. + * + * @opensearch.internal + */ + public static class NodesGatewayStartedShards extends BaseNodesResponse { + + public NodesGatewayStartedShards(StreamInput in) throws IOException { + super(in); + } + + public NodesGatewayStartedShards( + ClusterName clusterName, + List nodes, + List failures + ) { + super(clusterName, nodes, failures); + } + + @Override + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readList(BulkOfNodeGatewayStartedShards::new); + } + + @Override + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeList(nodes); + } + } + + /** + * The request. + * + * @opensearch.internal + */ + public static class NodeRequest extends TransportRequest { + + private final Map shardIdsWithCustomDataPath; + + public NodeRequest(StreamInput in) throws IOException { + super(in); + shardIdsWithCustomDataPath = in.readMap(ShardId::new, StreamInput::readString); + } + + public NodeRequest(Request request) { + + this.shardIdsWithCustomDataPath = Objects.requireNonNull(request.getShardIdsMap()); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeMap(shardIdsWithCustomDataPath, (o, k) -> k.writeTo(o), StreamOutput::writeString); + } + + } + + /** + * The response as stored by TransportNodesListGatewayStartedShards(to maintain backward compatibility). + * + * @opensearch.internal + */ + public static class NodeGatewayStartedShards { + private final String allocationId; + private final boolean primary; + private final Exception storeException; + private final ReplicationCheckpoint replicationCheckpoint; + + public NodeGatewayStartedShards(StreamInput in) throws IOException { + allocationId = in.readOptionalString(); + primary = in.readBoolean(); + if (in.readBoolean()) { + storeException = in.readException(); + } else { + storeException = null; + } + if (in.getVersion().onOrAfter(Version.V_2_3_0) && in.readBoolean()) { + replicationCheckpoint = new ReplicationCheckpoint(in); + } else { + replicationCheckpoint = null; + } + } + + public NodeGatewayStartedShards(String allocationId, boolean primary, ReplicationCheckpoint replicationCheckpoint) { + this(allocationId, primary, replicationCheckpoint, null); + } + + public NodeGatewayStartedShards( + String allocationId, + boolean primary, + ReplicationCheckpoint replicationCheckpoint, + Exception storeException + ) { + this.allocationId = allocationId; + this.primary = primary; + this.replicationCheckpoint = replicationCheckpoint; + this.storeException = storeException; + } + + public String allocationId() { + return this.allocationId; + } + + public boolean primary() { + return this.primary; + } + + public ReplicationCheckpoint replicationCheckpoint() { + return this.replicationCheckpoint; + } + + public Exception storeException() { + return this.storeException; + } + + public void writeTo(StreamOutput out) throws IOException { + TransportNodesGatewayStartedShardHelper.NodeGatewayStartedShardsWriteTo( + out, + allocationId, + primary, + storeException, + replicationCheckpoint + ); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + NodeGatewayStartedShards that = (NodeGatewayStartedShards) o; + + return primary == that.primary + && Objects.equals(allocationId, that.allocationId) + && Objects.equals(storeException, that.storeException) + && Objects.equals(replicationCheckpoint, that.replicationCheckpoint); + } + + @Override + public int hashCode() { + return TransportNodesGatewayStartedShardHelper.NodeGatewayStartedShardsHashCode( + allocationId, + primary, + storeException, + replicationCheckpoint + ); + } + + @Override + public String toString() { + return TransportNodesGatewayStartedShardHelper.NodeGatewayStartedShardsToString( + allocationId, + primary, + storeException, + replicationCheckpoint + ); + } + } + + public static class BulkOfNodeGatewayStartedShards extends BaseNodeResponse { + public Map getBulkOfNodeGatewayStartedShards() { + return bulkOfNodeGatewayStartedShards; + } + + private final Map bulkOfNodeGatewayStartedShards; + + public BulkOfNodeGatewayStartedShards(StreamInput in) throws IOException { + super(in); + this.bulkOfNodeGatewayStartedShards = in.readMap(ShardId::new, NodeGatewayStartedShards::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeMap(bulkOfNodeGatewayStartedShards, (o, k) -> k.writeTo(o), (o, v) -> v.writeTo(o)); + } + + public BulkOfNodeGatewayStartedShards(DiscoveryNode node, Map bulkOfNodeGatewayStartedShards) { + super(node); + this.bulkOfNodeGatewayStartedShards = bulkOfNodeGatewayStartedShards; + } + + } +} diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java b/server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java new file mode 100644 index 0000000000000..f3fcaf29c00ad --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java @@ -0,0 +1,189 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.OpenSearchException; +import org.opensearch.Version; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.settings.Settings; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.shard.ShardId; +import org.opensearch.index.shard.ShardPath; +import org.opensearch.index.shard.ShardStateMetadata; +import org.opensearch.index.store.Store; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; + +import java.io.IOException; + +/** + * This class has the common code used in TransportNodesBulkListGatewayStartedShards and TransportNodesListGatewayStartedShards + */ +public class TransportNodesGatewayStartedShardHelper { + + /** + * Helper function for getting the data path of the shard that is used to look up information for this shard. + * If the dataPathInRequest passed to the method is not empty then same is returned. Else the custom data path is returned + * from the indexSettings fetched from the cluster state metadata for the specified shard. + * + * @param logger + * @param shardId + * @param dataPathInRequest + * @param settings + * @param clusterService + * @return String + */ + public static String getCustomDataPathForShard( + Logger logger, + ShardId shardId, + String dataPathInRequest, + Settings settings, + ClusterService clusterService + ) { + if (dataPathInRequest != null) return dataPathInRequest; + // TODO: Fallback for BWC with older OpenSearch versions. + // Remove once request.getCustomDataPath() always returns non-null + final IndexMetadata metadata = clusterService.state().metadata().index(shardId.getIndex()); + if (metadata != null) { + return new IndexSettings(metadata, settings).customDataPath(); + } else { + logger.trace("{} node doesn't have meta data for the requests index", shardId); + throw new OpenSearchException("node doesn't have meta data for index " + shardId.getIndex()); + } + } + + /** + * Helper function for checking if the shard file exists and is not corrupted. We return the specific exception if + * the shard file is corrupted. else null value is returned. + * + * @param logger + * @param nodeEnv + * @param shardId + * @param shardStateMetadata + * @param customDataPath + * @return Exception + */ + public static Exception getShardCorruption( + Logger logger, + NodeEnvironment nodeEnv, + ShardId shardId, + ShardStateMetadata shardStateMetadata, + String customDataPath + ) { + ShardPath shardPath = null; + try { + shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, customDataPath); + if (shardPath == null) { + throw new IllegalStateException(shardId + " no shard path found"); + } + Store.tryOpenIndex(shardPath.resolveIndex(), shardId, nodeEnv::shardLock, logger); + } catch (Exception exception) { + final ShardPath finalShardPath = shardPath; + logger.trace( + () -> new ParameterizedMessage( + "{} can't open index for shard [{}] in path [{}]", + shardId, + shardStateMetadata, + (finalShardPath != null) ? finalShardPath.resolveIndex() : "" + ), + exception + ); + return exception; + } + return null; + } + + /** + * Helper function for getting the string representation of the NodeGatewayStartedShards object + * + * @param allocationId + * @param primary + * @param storeException + * @param replicationCheckpoint + * @return String + */ + public static String NodeGatewayStartedShardsToString( + String allocationId, + boolean primary, + Exception storeException, + ReplicationCheckpoint replicationCheckpoint + ) { + StringBuilder buf = new StringBuilder(); + buf.append("NodeGatewayStartedShards[").append("allocationId=").append(allocationId).append(",primary=").append(primary); + if (storeException != null) { + buf.append(",storeException=").append(storeException); + } + if (replicationCheckpoint != null) { + buf.append(",ReplicationCheckpoint=").append(replicationCheckpoint.toString()); + } + buf.append("]"); + return buf.toString(); + } + + /** + * Helper function for computing the hashcode of the NodeGatewayStartedShards object + * + * @param allocationId + * @param primary + * @param storeException + * @param replicationCheckpoint + * @return int + */ + public static int NodeGatewayStartedShardsHashCode( + String allocationId, + boolean primary, + Exception storeException, + ReplicationCheckpoint replicationCheckpoint + ) { + int result = (allocationId != null ? allocationId.hashCode() : 0); + result = 31 * result + (primary ? 1 : 0); + result = 31 * result + (storeException != null ? storeException.hashCode() : 0); + result = 31 * result + (replicationCheckpoint != null ? replicationCheckpoint.hashCode() : 0); + return result; + } + + /** + * Helper function for NodeGatewayStartedShardsWriteTo method + * + * @param out + * @param allocationId + * @param primary + * @param storeException + * @param replicationCheckpoint + * @throws IOException + */ + public static void NodeGatewayStartedShardsWriteTo( + StreamOutput out, + String allocationId, + boolean primary, + Exception storeException, + ReplicationCheckpoint replicationCheckpoint + ) throws IOException { + out.writeOptionalString(allocationId); + out.writeBoolean(primary); + if (storeException != null) { + out.writeBoolean(true); + out.writeException(storeException); + } else { + out.writeBoolean(false); + } + if (out.getVersion().onOrAfter(Version.V_2_3_0)) { + if (replicationCheckpoint != null) { + out.writeBoolean(true); + replicationCheckpoint.writeTo(out); + } else { + out.writeBoolean(false); + } + } + } +} diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java index b529557aa9815..90fda40da2349 100644 --- a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java @@ -32,7 +32,6 @@ package org.opensearch.gateway; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.action.ActionListener; @@ -44,7 +43,6 @@ import org.opensearch.action.support.nodes.BaseNodesResponse; import org.opensearch.action.support.nodes.TransportNodesAction; import org.opensearch.cluster.ClusterName; -import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; @@ -54,12 +52,9 @@ import org.opensearch.common.settings.Settings; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.NodeEnvironment; -import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; -import org.opensearch.index.shard.ShardPath; import org.opensearch.index.shard.ShardStateMetadata; -import org.opensearch.index.store.Store; import org.opensearch.indices.IndicesService; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.threadpool.ThreadPool; @@ -158,41 +153,23 @@ protected NodeGatewayStartedShards nodeOperation(NodeRequest request) { nodeEnv.availableShardPaths(request.shardId) ); if (shardStateMetadata != null) { - if (indicesService.getShardOrNull(shardId) == null - && shardStateMetadata.indexDataLocation == ShardStateMetadata.IndexDataLocation.LOCAL) { - final String customDataPath; - if (request.getCustomDataPath() != null) { - customDataPath = request.getCustomDataPath(); - } else { - // TODO: Fallback for BWC with older OpenSearch versions. - // Remove once request.getCustomDataPath() always returns non-null - final IndexMetadata metadata = clusterService.state().metadata().index(shardId.getIndex()); - if (metadata != null) { - customDataPath = new IndexSettings(metadata, settings).customDataPath(); - } else { - logger.trace("{} node doesn't have meta data for the requests index", shardId); - throw new OpenSearchException("node doesn't have meta data for index " + shardId.getIndex()); - } - } + if (indicesService.getShardOrNull(shardId) == null) { + final String customDataPath = TransportNodesGatewayStartedShardHelper.getCustomDataPathForShard( + logger, + shardId, + request.getCustomDataPath(), + settings, + clusterService + ); // we don't have an open shard on the store, validate the files on disk are openable - ShardPath shardPath = null; - try { - shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, customDataPath); - if (shardPath == null) { - throw new IllegalStateException(shardId + " no shard path found"); - } - Store.tryOpenIndex(shardPath.resolveIndex(), shardId, nodeEnv::shardLock, logger); - } catch (Exception exception) { - final ShardPath finalShardPath = shardPath; - logger.trace( - () -> new ParameterizedMessage( - "{} can't open index for shard [{}] in path [{}]", - shardId, - shardStateMetadata, - (finalShardPath != null) ? finalShardPath.resolveIndex() : "" - ), - exception - ); + Exception exception = TransportNodesGatewayStartedShardHelper.getShardCorruption( + logger, + nodeEnv, + shardId, + shardStateMetadata, + customDataPath + ); + if (exception != null) { String allocationId = shardStateMetadata.allocationId != null ? shardStateMetadata.allocationId.getId() : null; return new NodeGatewayStartedShards( clusterService.localNode(), @@ -203,7 +180,6 @@ protected NodeGatewayStartedShards nodeOperation(NodeRequest request) { ); } } - logger.debug("{} shard state info found: [{}]", shardId, shardStateMetadata); String allocationId = shardStateMetadata.allocationId != null ? shardStateMetadata.allocationId.getId() : null; final IndexShard shard = indicesService.getShardOrNull(shardId); @@ -410,22 +386,13 @@ public Exception storeException() { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeOptionalString(allocationId); - out.writeBoolean(primary); - if (storeException != null) { - out.writeBoolean(true); - out.writeException(storeException); - } else { - out.writeBoolean(false); - } - if (out.getVersion().onOrAfter(Version.V_2_3_0)) { - if (replicationCheckpoint != null) { - out.writeBoolean(true); - replicationCheckpoint.writeTo(out); - } else { - out.writeBoolean(false); - } - } + TransportNodesGatewayStartedShardHelper.NodeGatewayStartedShardsWriteTo( + out, + allocationId, + primary, + storeException, + replicationCheckpoint + ); } @Override @@ -447,25 +414,22 @@ public boolean equals(Object o) { @Override public int hashCode() { - int result = (allocationId != null ? allocationId.hashCode() : 0); - result = 31 * result + (primary ? 1 : 0); - result = 31 * result + (storeException != null ? storeException.hashCode() : 0); - result = 31 * result + (replicationCheckpoint != null ? replicationCheckpoint.hashCode() : 0); - return result; + return TransportNodesGatewayStartedShardHelper.NodeGatewayStartedShardsHashCode( + allocationId, + primary, + storeException, + replicationCheckpoint + ); } @Override public String toString() { - StringBuilder buf = new StringBuilder(); - buf.append("NodeGatewayStartedShards[").append("allocationId=").append(allocationId).append(",primary=").append(primary); - if (storeException != null) { - buf.append(",storeException=").append(storeException); - } - if (replicationCheckpoint != null) { - buf.append(",ReplicationCheckpoint=").append(replicationCheckpoint.toString()); - } - buf.append("]"); - return buf.toString(); + return TransportNodesGatewayStartedShardHelper.NodeGatewayStartedShardsToString( + allocationId, + primary, + storeException, + replicationCheckpoint + ); } } }