From fb5a9833be5f56334635fa1f001be04362c84233 Mon Sep 17 00:00:00 2001 From: sudarshan baliga Date: Thu, 29 Jun 2023 17:25:04 +0530 Subject: [PATCH] Add batch async shard fetch transport action for replica #8218 Modify async replica shard fetch to use helper functions Signed-off-by: sudarshan baliga --- .../gateway/AsyncShardsFetchPerNode.java | 36 ++ .../gateway/ReplicaShardAllocator.java | 26 +- ...sportNodesBatchListShardStoreMetadata.java | 323 ++++++++++++++++++ .../TransportNodesListShardStoreMetadata.java | 192 +---------- ...portNodesListShardStoreMetadataHelper.java | 230 +++++++++++++ .../gateway/ReplicaShardAllocatorTests.java | 11 +- .../opensearch/index/store/StoreTests.java | 10 +- 7 files changed, 627 insertions(+), 201 deletions(-) create mode 100644 server/src/main/java/org/opensearch/gateway/AsyncShardsFetchPerNode.java create mode 100644 server/src/main/java/org/opensearch/indices/store/TransportNodesBatchListShardStoreMetadata.java create mode 100644 server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardsFetchPerNode.java b/server/src/main/java/org/opensearch/gateway/AsyncShardsFetchPerNode.java new file mode 100644 index 0000000000000..6e028a2f4513b --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardsFetchPerNode.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.support.nodes.BaseNodeResponse; +import org.opensearch.action.support.nodes.BaseNodesResponse; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.lease.Releasable; +import org.opensearch.index.shard.ShardId; + +import java.util.Map; + + +/** + * This class is responsible for fetching shard data from nodes. It is analogous to AsyncShardFetch class except + * that we fetch batch of shards in this class from single transport request to a node. + * @param + * + * @opensearch.internal + */ +public abstract class AsyncShardsFetchPerNode implements Releasable { + + /** + * An action that lists the relevant shard data that needs to be fetched. + */ + public interface Lister, NodeResponse extends BaseNodeResponse> { + void list(DiscoveryNode[] nodes, Map shardIdsWithCustomDataPath, ActionListener listener); + } +} diff --git a/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java b/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java index 5216dd2fcb4b5..e549344dda9d0 100644 --- a/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ReplicaShardAllocator.java @@ -51,8 +51,8 @@ import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; import org.opensearch.index.store.StoreFileMetadata; -import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; import org.opensearch.indices.store.TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata; +import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper; import java.util.ArrayList; import java.util.Collections; @@ -106,7 +106,7 @@ public void processExistingRecoveries(RoutingAllocation allocation) { assert primaryShard != null : "the replica shard can be allocated on at least one node, so there must be an active primary"; assert primaryShard.currentNodeId() != null; final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId()); - final TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore = findStore(primaryNode, shardStores); + final TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore = findStore(primaryNode, shardStores); if (primaryStore == null) { // if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed) // just let the recovery find it out, no need to do anything about it for the initializing shard @@ -223,7 +223,7 @@ public AllocateUnassignedDecision makeAllocationDecision( } assert primaryShard.currentNodeId() != null; final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId()); - final TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore = findStore(primaryNode, shardStores); + final TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore = findStore(primaryNode, shardStores); if (primaryStore == null) { // if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed) // we want to let the replica be allocated in order to expose the actual problem with the primary that the replica @@ -357,7 +357,7 @@ private static List augmentExplanationsWithStoreInfo( /** * Finds the store for the assigned shard in the fetched data, returns null if none is found. */ - private static TransportNodesListShardStoreMetadata.StoreFilesMetadata findStore( + private static TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata findStore( DiscoveryNode node, AsyncShardFetch.FetchResult data ) { @@ -373,7 +373,7 @@ private MatchingNodes findMatchingNodes( RoutingAllocation allocation, boolean noMatchFailedNodes, DiscoveryNode primaryNode, - TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore, + TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore, AsyncShardFetch.FetchResult data, boolean explain ) { @@ -386,7 +386,7 @@ private MatchingNodes findMatchingNodes( && shard.unassignedInfo().getFailedNodeIds().contains(discoNode.getId())) { continue; } - TransportNodesListShardStoreMetadata.StoreFilesMetadata storeFilesMetadata = nodeStoreEntry.getValue().storeFilesMetadata(); + TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFilesMetadata = nodeStoreEntry.getValue().storeFilesMetadata(); // we don't have any files at all, it is an empty index if (storeFilesMetadata.isEmpty()) { continue; @@ -442,8 +442,8 @@ private MatchingNodes findMatchingNodes( } private static long computeMatchingBytes( - TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore, - TransportNodesListShardStoreMetadata.StoreFilesMetadata storeFilesMetadata + TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore, + TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFilesMetadata ) { long sizeMatched = 0; for (StoreFileMetadata storeFileMetadata : storeFilesMetadata) { @@ -456,8 +456,8 @@ private static long computeMatchingBytes( } private static boolean hasMatchingSyncId( - TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore, - TransportNodesListShardStoreMetadata.StoreFilesMetadata replicaStore + TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore, + TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata replicaStore ) { String primarySyncId = primaryStore.syncId(); return primarySyncId != null && primarySyncId.equals(replicaStore.syncId()); @@ -465,9 +465,9 @@ private static boolean hasMatchingSyncId( private static MatchingNode computeMatchingNode( DiscoveryNode primaryNode, - TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore, + TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore, DiscoveryNode replicaNode, - TransportNodesListShardStoreMetadata.StoreFilesMetadata replicaStore + TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata replicaStore ) { final long retainingSeqNoForPrimary = primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(primaryNode); final long retainingSeqNoForReplica = primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(replicaNode); @@ -478,7 +478,7 @@ private static MatchingNode computeMatchingNode( } private static boolean canPerformOperationBasedRecovery( - TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore, + TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata primaryStore, AsyncShardFetch.FetchResult shardStores, DiscoveryNode targetNode ) { diff --git a/server/src/main/java/org/opensearch/indices/store/TransportNodesBatchListShardStoreMetadata.java b/server/src/main/java/org/opensearch/indices/store/TransportNodesBatchListShardStoreMetadata.java new file mode 100644 index 0000000000000..280e7f03eeb34 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/store/TransportNodesBatchListShardStoreMetadata.java @@ -0,0 +1,323 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.store; + +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.OpenSearchException; +import org.opensearch.action.ActionListener; +import org.opensearch.action.ActionType; +import org.opensearch.action.FailedNodeException; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.nodes.BaseNodeResponse; +import org.opensearch.action.support.nodes.BaseNodesRequest; +import org.opensearch.action.support.nodes.BaseNodesResponse; +import org.opensearch.action.support.nodes.TransportNodesAction; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.settings.Settings; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.gateway.AsyncShardsFetchPerNode; +import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.IndicesService; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * Transport action for fetching the batch of shard stores Metadata from a list of transport nodes + * + * @opensearch.internal + */ +public class TransportNodesBatchListShardStoreMetadata extends TransportNodesAction< + TransportNodesBatchListShardStoreMetadata.Request, + TransportNodesBatchListShardStoreMetadata.NodesStoreFilesMetadata, + TransportNodesBatchListShardStoreMetadata.NodeRequest, + TransportNodesBatchListShardStoreMetadata.NodeStoreFilesMetadataBatch> implements + AsyncShardsFetchPerNode.Lister< + TransportNodesBatchListShardStoreMetadata.NodesStoreFilesMetadata, + TransportNodesBatchListShardStoreMetadata.NodeStoreFilesMetadataBatch> { + + public static final String ACTION_NAME = "internal:cluster/nodes/indices/shard/store/batch"; + public static final ActionType TYPE = new ActionType<>(ACTION_NAME, TransportNodesListShardStoreMetadata.NodesStoreFilesMetadata::new); + + private final Settings settings; + private final IndicesService indicesService; + private final NodeEnvironment nodeEnv; + + @Inject + public TransportNodesBatchListShardStoreMetadata( + Settings settings, + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + IndicesService indicesService, + NodeEnvironment nodeEnv, + ActionFilters actionFilters + ) { + super( + ACTION_NAME, + threadPool, + clusterService, + transportService, + actionFilters, + Request::new, + NodeRequest::new, + ThreadPool.Names.FETCH_SHARD_STORE, + NodeStoreFilesMetadataBatch.class + ); + this.settings = settings; + this.indicesService = indicesService; + this.nodeEnv = nodeEnv; + } + + @Override + public void list(DiscoveryNode[] nodes, Map shardIdsWithCustomDataPath, ActionListener listener) { + execute(new TransportNodesBatchListShardStoreMetadata.Request(shardIdsWithCustomDataPath, nodes), listener); + } + + @Override + protected NodeRequest newNodeRequest(Request request) { + return new NodeRequest(request); + } + + @Override + protected NodeStoreFilesMetadataBatch newNodeResponse(StreamInput in) throws IOException { + return new NodeStoreFilesMetadataBatch(in); + } + + @Override + protected NodesStoreFilesMetadata newResponse( + Request request, + List responses, + List failures + ) { + return new NodesStoreFilesMetadata(clusterService.getClusterName(), responses, failures); + } + + @Override + protected NodeStoreFilesMetadataBatch nodeOperation(NodeRequest request) { + try { + return new NodeStoreFilesMetadataBatch(clusterService.localNode(), listStoreMetadata(request)); + } catch (IOException e) { + throw new OpenSearchException("Failed to list store metadata for shards [" + request.getShardIdsWithCustomDataPath() + "]", e); + } + } + + /** + * This method is similar to listStoreMetadata method of {@link TransportNodesListShardStoreMetadata} + * In this case we fetch the shard store files for batch of shards instead of one shard. + */ + private Map listStoreMetadata(NodeRequest request) throws IOException { + Map shardStoreMetadataMap = new HashMap(); + for(Map.Entry shardToCustomDataPathEntry: request.getShardIdsWithCustomDataPath().entrySet()) { + final ShardId shardId = shardToCustomDataPathEntry.getKey(); + try { + logger.debug("Listing store meta data for {}", shardId); + TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata nodeStoreFilesMetadata = TransportNodesListShardStoreMetadataHelper.getStoreFilesMetadata( + logger, + indicesService, + clusterService, + shardId, + shardToCustomDataPathEntry.getValue(), + settings, + nodeEnv + ); + shardStoreMetadataMap.put( + shardId, + new NodeStoreFilesMetadata( + nodeStoreFilesMetadata, + null + ) + ); + } catch (Exception storeFileFetchException) { + logger.trace(new ParameterizedMessage("Unable to fetch store files for shard [{}]", shardId), storeFileFetchException); + shardStoreMetadataMap.put( + shardId, + new NodeStoreFilesMetadata( + null, + storeFileFetchException + ) + ); + } + } + logger.debug("Loaded store meta data for {} shards", shardStoreMetadataMap); + return shardStoreMetadataMap; + } + + + /** + * The request + * + * @opensearch.internal + */ + public static class Request extends BaseNodesRequest { + + private final Map shardIdsWithCustomDataPath; + + + public Request(StreamInput in) throws IOException { + super(in); + shardIdsWithCustomDataPath = in.readMap(ShardId::new, StreamInput::readString); + } + + public Request(Map shardIdsWithCustomDataPath, DiscoveryNode[] nodes) { + super(nodes); + this.shardIdsWithCustomDataPath = Objects.requireNonNull(shardIdsWithCustomDataPath); + } + + public Map getShardIdsWithCustomDataPath() { + return shardIdsWithCustomDataPath; + } + + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeMap(shardIdsWithCustomDataPath, (o, k) -> k.writeTo(o), StreamOutput::writeString); + } + } + + /** + * Metadata for the nodes store files + * + * @opensearch.internal + */ + public static class NodesStoreFilesMetadata extends BaseNodesResponse { + + public NodesStoreFilesMetadata(StreamInput in) throws IOException { + super(in); + } + + public NodesStoreFilesMetadata(ClusterName clusterName, List nodes, List failures) { + super(clusterName, nodes, failures); + } + + @Override + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readList(NodeStoreFilesMetadataBatch::new); + } + + @Override + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeList(nodes); + } + } + + /** + * The metadata for the node store files + * + * @opensearch.internal + */ + public static class NodeStoreFilesMetadata { + + private TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFilesMetadata; + private Exception storeFileFetchException; + + public NodeStoreFilesMetadata(TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFilesMetadata) { + this.storeFilesMetadata = storeFilesMetadata; + this.storeFileFetchException = null; + } + + public NodeStoreFilesMetadata(StreamInput in) throws IOException { + storeFilesMetadata = new TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata(in); + this.storeFileFetchException = null; + } + + public NodeStoreFilesMetadata(TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFilesMetadata, Exception storeFileFetchException) { + this.storeFilesMetadata = storeFilesMetadata; + this.storeFileFetchException = storeFileFetchException; + } + + public TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFilesMetadata() { + return storeFilesMetadata; + } + + public static NodeStoreFilesMetadata readListShardStoreNodeOperationResponse(StreamInput in) throws IOException { + return new NodeStoreFilesMetadata(in); + } + + public void writeTo(StreamOutput out) throws IOException { + storeFilesMetadata.writeTo(out); + } + + public Exception getStoreFileFetchException() { + return storeFileFetchException; + } + + @Override + public String toString() { + return "[[" + storeFilesMetadata + "]]"; + } + } + + /** + * The node request + * + * @opensearch.internal + */ + public static class NodeRequest extends TransportRequest { + + private final Map shardIdsWithCustomDataPath; + + public NodeRequest(StreamInput in) throws IOException { + super(in); + shardIdsWithCustomDataPath = in.readMap(ShardId::new, StreamInput::readString); + } + + public NodeRequest(Request request) { + this.shardIdsWithCustomDataPath = Objects.requireNonNull(request.getShardIdsWithCustomDataPath()); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeMap(shardIdsWithCustomDataPath, (o, k) -> k.writeTo(o), StreamOutput::writeString); + } + + public Map getShardIdsWithCustomDataPath() { + return shardIdsWithCustomDataPath; + } + } + + + public static class NodeStoreFilesMetadataBatch extends BaseNodeResponse { + private final Map nodeStoreFilesMetadataBatch; + protected NodeStoreFilesMetadataBatch(StreamInput in) throws IOException { + super(in); + this.nodeStoreFilesMetadataBatch = in.readMap(ShardId::new, NodeStoreFilesMetadata::new); + } + + + public NodeStoreFilesMetadataBatch(DiscoveryNode node, Map nodeStoreFilesMetadataBatch) { + super(node); + this.nodeStoreFilesMetadataBatch = nodeStoreFilesMetadataBatch; + } + + public Map getNodeStoreFilesMetadataBatch() { + return this.nodeStoreFilesMetadataBatch; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeMap(nodeStoreFilesMetadataBatch, (o, k) -> k.writeTo(o), (o, v) -> v.writeTo(o)); + } + } + +} diff --git a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java index bdb0d99fa93b0..0e5d79ecef368 100644 --- a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java +++ b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java @@ -32,7 +32,6 @@ package org.opensearch.indices.store; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionType; @@ -43,38 +42,24 @@ import org.opensearch.action.support.nodes.BaseNodesResponse; import org.opensearch.action.support.nodes.TransportNodesAction; import org.opensearch.cluster.ClusterName; -import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; import org.opensearch.common.inject.Inject; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; -import org.opensearch.common.io.stream.Writeable; import org.opensearch.common.settings.Settings; -import org.opensearch.common.unit.TimeValue; import org.opensearch.env.NodeEnvironment; import org.opensearch.gateway.AsyncShardFetch; -import org.opensearch.index.IndexService; -import org.opensearch.index.IndexSettings; -import org.opensearch.index.seqno.ReplicationTracker; -import org.opensearch.index.seqno.RetentionLease; -import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; -import org.opensearch.index.shard.ShardPath; -import org.opensearch.index.store.Store; -import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.IndicesService; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportRequest; import org.opensearch.transport.TransportService; import java.io.IOException; -import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.Objects; -import java.util.concurrent.TimeUnit; /** * Metadata for shard stores from a list of transport nodes @@ -157,169 +142,20 @@ protected NodeStoreFilesMetadata nodeOperation(NodeRequest request) { } } - private StoreFilesMetadata listStoreMetadata(NodeRequest request) throws IOException { + private TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata listStoreMetadata(NodeRequest request) throws IOException { final ShardId shardId = request.getShardId(); logger.trace("listing store meta data for {}", shardId); - long startTimeNS = System.nanoTime(); - boolean exists = false; - try { - IndexService indexService = indicesService.indexService(shardId.getIndex()); - if (indexService != null) { - IndexShard indexShard = indexService.getShardOrNull(shardId.id()); - if (indexShard != null) { - try { - final StoreFilesMetadata storeFilesMetadata = new StoreFilesMetadata( - shardId, - indexShard.snapshotStoreMetadata(), - indexShard.getPeerRecoveryRetentionLeases() - ); - exists = true; - return storeFilesMetadata; - } catch (org.apache.lucene.index.IndexNotFoundException e) { - logger.trace(new ParameterizedMessage("[{}] node is missing index, responding with empty", shardId), e); - return new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()); - } catch (IOException e) { - logger.warn(new ParameterizedMessage("[{}] can't read metadata from store, responding with empty", shardId), e); - return new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()); - } - } - } - final String customDataPath; - if (request.getCustomDataPath() != null) { - customDataPath = request.getCustomDataPath(); - } else { - // TODO: Fallback for BWC with older predecessor (ES) versions. - // Remove this once request.getCustomDataPath() always returns non-null - if (indexService != null) { - customDataPath = indexService.getIndexSettings().customDataPath(); - } else { - IndexMetadata metadata = clusterService.state().metadata().index(shardId.getIndex()); - if (metadata != null) { - customDataPath = new IndexSettings(metadata, settings).customDataPath(); - } else { - logger.trace("{} node doesn't have meta data for the requests index", shardId); - throw new OpenSearchException("node doesn't have meta data for index " + shardId.getIndex()); - } - } - } - final ShardPath shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, customDataPath); - if (shardPath == null) { - return new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()); - } - // note that this may fail if it can't get access to the shard lock. Since we check above there is an active shard, this means: - // 1) a shard is being constructed, which means the cluster-manager will not use a copy of this replica - // 2) A shard is shutting down and has not cleared it's content within lock timeout. In this case the cluster-manager may not - // reuse local resources. - final Store.MetadataSnapshot metadataSnapshot = Store.readMetadataSnapshot( - shardPath.resolveIndex(), - shardId, - nodeEnv::shardLock, - logger - ); - // We use peer recovery retention leases from the primary for allocating replicas. We should always have retention leases when - // we refresh shard info after the primary has started. Hence, we can ignore retention leases if there is no active shard. - return new StoreFilesMetadata(shardId, metadataSnapshot, Collections.emptyList()); - } finally { - TimeValue took = new TimeValue(System.nanoTime() - startTimeNS, TimeUnit.NANOSECONDS); - if (exists) { - logger.debug("{} loaded store meta data (took [{}])", shardId, took); - } else { - logger.trace("{} didn't find any store meta data to load (took [{}])", shardId, took); - } - } + return TransportNodesListShardStoreMetadataHelper.getStoreFilesMetadata( + logger, + indicesService, + clusterService, + shardId, + request.getCustomDataPath(), + settings, + nodeEnv + ); } - /** - * Metadata for store files - * - * @opensearch.internal - */ - public static class StoreFilesMetadata implements Iterable, Writeable { - private final ShardId shardId; - private final Store.MetadataSnapshot metadataSnapshot; - private final List peerRecoveryRetentionLeases; - - public StoreFilesMetadata( - ShardId shardId, - Store.MetadataSnapshot metadataSnapshot, - List peerRecoveryRetentionLeases - ) { - this.shardId = shardId; - this.metadataSnapshot = metadataSnapshot; - this.peerRecoveryRetentionLeases = peerRecoveryRetentionLeases; - } - - public StoreFilesMetadata(StreamInput in) throws IOException { - this.shardId = new ShardId(in); - this.metadataSnapshot = new Store.MetadataSnapshot(in); - this.peerRecoveryRetentionLeases = in.readList(RetentionLease::new); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - shardId.writeTo(out); - metadataSnapshot.writeTo(out); - out.writeList(peerRecoveryRetentionLeases); - } - - public ShardId shardId() { - return this.shardId; - } - - public boolean isEmpty() { - return metadataSnapshot.size() == 0; - } - - @Override - public Iterator iterator() { - return metadataSnapshot.iterator(); - } - - public boolean fileExists(String name) { - return metadataSnapshot.asMap().containsKey(name); - } - - public StoreFileMetadata file(String name) { - return metadataSnapshot.asMap().get(name); - } - - /** - * Returns the retaining sequence number of the peer recovery retention lease for a given node if exists; otherwise, returns -1. - */ - public long getPeerRecoveryRetentionLeaseRetainingSeqNo(DiscoveryNode node) { - assert node != null; - final String retentionLeaseId = ReplicationTracker.getPeerRecoveryRetentionLeaseId(node.getId()); - return peerRecoveryRetentionLeases.stream() - .filter(lease -> lease.id().equals(retentionLeaseId)) - .mapToLong(RetentionLease::retainingSequenceNumber) - .findFirst() - .orElse(-1L); - } - - public List peerRecoveryRetentionLeases() { - return peerRecoveryRetentionLeases; - } - - /** - * @return commit sync id if exists, else null - */ - public String syncId() { - return metadataSnapshot.getSyncId(); - } - - @Override - public String toString() { - return "StoreFilesMetadata{" - + ", shardId=" - + shardId - + ", metadataSnapshot{size=" - + metadataSnapshot.size() - + ", syncId=" - + metadataSnapshot.getSyncId() - + "}" - + '}'; - } - } /** * The request @@ -444,19 +280,19 @@ public String getCustomDataPath() { */ public static class NodeStoreFilesMetadata extends BaseNodeResponse { - private StoreFilesMetadata storeFilesMetadata; + private TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFilesMetadata; public NodeStoreFilesMetadata(StreamInput in) throws IOException { super(in); - storeFilesMetadata = new StoreFilesMetadata(in); + storeFilesMetadata = new TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata(in); } - public NodeStoreFilesMetadata(DiscoveryNode node, StoreFilesMetadata storeFilesMetadata) { + public NodeStoreFilesMetadata(DiscoveryNode node, TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFilesMetadata) { super(node); this.storeFilesMetadata = storeFilesMetadata; } - public StoreFilesMetadata storeFilesMetadata() { + public TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata storeFilesMetadata() { return storeFilesMetadata; } diff --git a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java new file mode 100644 index 0000000000000..4404a728a8f1c --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataHelper.java @@ -0,0 +1,230 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.store; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.OpenSearchException; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.index.IndexService; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.seqno.ReplicationTracker; +import org.opensearch.index.seqno.RetentionLease; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.ShardId; +import org.opensearch.index.shard.ShardPath; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.IndicesService; + +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Helper class for common code used in {@link TransportNodesListShardStoreMetadata} and + * {@link TransportNodesBatchListShardStoreMetadata} which are used to list the store metadata of a shard + * on a set of nodes. + * + * @opensearch.internal + */ +public class TransportNodesListShardStoreMetadataHelper { + /** + * Metadata for store files + * + * @opensearch.internal + */ + public static class StoreFilesMetadata implements Iterable, Writeable { + private final ShardId shardId; + private final Store.MetadataSnapshot metadataSnapshot; + private final List peerRecoveryRetentionLeases; + + public StoreFilesMetadata( + ShardId shardId, + Store.MetadataSnapshot metadataSnapshot, + List peerRecoveryRetentionLeases + ) { + this.shardId = shardId; + this.metadataSnapshot = metadataSnapshot; + this.peerRecoveryRetentionLeases = peerRecoveryRetentionLeases; + } + + public StoreFilesMetadata(StreamInput in) throws IOException { + this.shardId = new ShardId(in); + this.metadataSnapshot = new Store.MetadataSnapshot(in); + this.peerRecoveryRetentionLeases = in.readList(RetentionLease::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + shardId.writeTo(out); + metadataSnapshot.writeTo(out); + out.writeList(peerRecoveryRetentionLeases); + } + + public ShardId shardId() { + return this.shardId; + } + + public boolean isEmpty() { + return metadataSnapshot.size() == 0; + } + + @Override + public Iterator iterator() { + return metadataSnapshot.iterator(); + } + + public boolean fileExists(String name) { + return metadataSnapshot.asMap().containsKey(name); + } + + public StoreFileMetadata file(String name) { + return metadataSnapshot.asMap().get(name); + } + + /** + * Returns the retaining sequence number of the peer recovery retention lease for a given node if exists; otherwise, returns -1. + */ + public long getPeerRecoveryRetentionLeaseRetainingSeqNo(DiscoveryNode node) { + assert node != null; + final String retentionLeaseId = ReplicationTracker.getPeerRecoveryRetentionLeaseId(node.getId()); + return peerRecoveryRetentionLeases.stream() + .filter(lease -> lease.id().equals(retentionLeaseId)) + .mapToLong(RetentionLease::retainingSequenceNumber) + .findFirst() + .orElse(-1L); + } + + public List peerRecoveryRetentionLeases() { + return peerRecoveryRetentionLeases; + } + + /** + * @return commit sync id if exists, else null + */ + public String syncId() { + return metadataSnapshot.getSyncId(); + } + + @Override + public String toString() { + return "StoreFilesMetadata{" + + ", shardId=" + + shardId + + ", metadataSnapshot{size=" + + metadataSnapshot.size() + + ", syncId=" + + metadataSnapshot.getSyncId() + + "}" + + '}'; + } + } + + /** + * Function to fetch metadata of the store files on the local node. + * @param logger + * @param indicesService + * @param clusterService + * @param shardId + * @param shardDataPathInRequest + * @param settings + * @param nodeEnv + * @return + * @throws IOException + */ + public static StoreFilesMetadata getStoreFilesMetadata( + Logger logger, + IndicesService indicesService, + ClusterService clusterService, + ShardId shardId, + String shardDataPathInRequest, + Settings settings, + NodeEnvironment nodeEnv + ) throws IOException { + boolean exists = false; + long startTimeNS = System.nanoTime(); + try { + IndexService indexService = indicesService.indexService(shardId.getIndex()); + if (indexService != null) { + IndexShard indexShard = indexService.getShardOrNull(shardId.id()); + if (indexShard != null) { + try { + final StoreFilesMetadata storeFilesMetadata = new StoreFilesMetadata( + shardId, + indexShard.snapshotStoreMetadata(), + indexShard.getPeerRecoveryRetentionLeases() + ); + exists = true; + return storeFilesMetadata; + } catch (org.apache.lucene.index.IndexNotFoundException e) { + logger.trace(new ParameterizedMessage("[{}] node is missing index, responding with empty", shardId), e); + return new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()); + } catch (IOException e) { + logger.warn(new ParameterizedMessage("[{}] can't read metadata from store, responding with empty", shardId), e); + return new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()); + } + } + } + final String customDataPath; + if (shardDataPathInRequest != null) { + customDataPath = shardDataPathInRequest; + } else { + // TODO: Fallback for BWC with older predecessor (ES) versions. + // Remove this once request.getCustomDataPath() always returns non-null + if (indexService != null) { + customDataPath = indexService.getIndexSettings().customDataPath(); + } else { + IndexMetadata metadata = clusterService.state().metadata().index(shardId.getIndex()); + if (metadata != null) { + customDataPath = new IndexSettings(metadata, settings).customDataPath(); + } else { + logger.trace("{} node doesn't have meta data for the requests index", shardId); + throw new OpenSearchException("node doesn't have meta data for index " + shardId.getIndex()); + } + } + } + final ShardPath shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, customDataPath); + if (shardPath == null) { + return new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()); + } + // note that this may fail if it can't get access to the shard lock. Since we check above there is an active shard, this means: + // 1) a shard is being constructed, which means the cluster-manager will not use a copy of this replica + // 2) A shard is shutting down and has not cleared it's content within lock timeout. In this case the cluster-manager may not + // reuse local resources. + final Store.MetadataSnapshot metadataSnapshot = Store.readMetadataSnapshot( + shardPath.resolveIndex(), + shardId, + nodeEnv::shardLock, + logger + ); + // We use peer recovery retention leases from the primary for allocating replicas. We should always have retention leases when + // we refresh shard info after the primary has started. Hence, we can ignore retention leases if there is no active shard. + return new StoreFilesMetadata(shardId, metadataSnapshot, Collections.emptyList()); + } finally { + TimeValue took = new TimeValue(System.nanoTime() - startTimeNS, TimeUnit.NANOSECONDS); + if (exists) { + logger.debug("{} loaded store meta data (took [{}])", shardId, took); + } else { + logger.trace("{} didn't find any store meta data to load (took [{}])", shardId, took); + } + } + } + +} diff --git a/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java index 36ac93524d6aa..1e33eca425e87 100644 --- a/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java @@ -33,9 +33,11 @@ package org.opensearch.gateway; import com.carrotsearch.randomizedtesting.generators.RandomPicks; +import org.junit.Before; import org.opensearch.Version; import org.opensearch.cluster.ClusterInfo; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.OpenSearchAllocationTestCase; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; @@ -66,9 +68,8 @@ import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; -import org.opensearch.cluster.OpenSearchAllocationTestCase; +import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper; import org.opensearch.snapshots.SnapshotShardSizeInfo; -import org.junit.Before; import java.util.ArrayList; import java.util.Arrays; @@ -664,7 +665,7 @@ static String randomSyncId() { class TestAllocator extends ReplicaShardAllocator { - private Map data = null; + private Map data = null; private AtomicBoolean fetchDataCalled = new AtomicBoolean(false); public void clean() { @@ -702,7 +703,7 @@ TestAllocator addData( } data.put( node, - new TransportNodesListShardStoreMetadata.StoreFilesMetadata( + new TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata( shardId, new Store.MetadataSnapshot(unmodifiableMap(filesAsMap), unmodifiableMap(commitData), randomInt()), peerRecoveryRetentionLeases @@ -720,7 +721,7 @@ protected AsyncShardFetch.FetchResult tData = null; if (data != null) { tData = new HashMap<>(); - for (Map.Entry entry : data.entrySet()) { + for (Map.Entry entry : data.entrySet()) { tData.put( entry.getKey(), new TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata(entry.getKey(), entry.getValue()) diff --git a/server/src/test/java/org/opensearch/index/store/StoreTests.java b/server/src/test/java/org/opensearch/index/store/StoreTests.java index b11e8554027b1..0e9403255fa66 100644 --- a/server/src/test/java/org/opensearch/index/store/StoreTests.java +++ b/server/src/test/java/org/opensearch/index/store/StoreTests.java @@ -86,7 +86,7 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.replication.common.ReplicationType; -import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; +import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper; import org.opensearch.test.DummyShardLock; import org.opensearch.test.FeatureFlagSetter; import org.opensearch.test.IndexSettingsModule; @@ -958,8 +958,8 @@ public void testStreamStoreFilesMetadata() throws Exception { ) ); } - TransportNodesListShardStoreMetadata.StoreFilesMetadata outStoreFileMetadata = - new TransportNodesListShardStoreMetadata.StoreFilesMetadata( + TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata outStoreFileMetadata = + new TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata( new ShardId("test", "_na_", 0), metadataSnapshot, peerRecoveryRetentionLeases @@ -972,8 +972,8 @@ public void testStreamStoreFilesMetadata() throws Exception { ByteArrayInputStream inBuffer = new ByteArrayInputStream(outBuffer.toByteArray()); InputStreamStreamInput in = new InputStreamStreamInput(inBuffer); in.setVersion(targetNodeVersion); - TransportNodesListShardStoreMetadata.StoreFilesMetadata inStoreFileMetadata = - new TransportNodesListShardStoreMetadata.StoreFilesMetadata(in); + TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata inStoreFileMetadata = + new TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata(in); Iterator outFiles = outStoreFileMetadata.iterator(); for (StoreFileMetadata inFile : inStoreFileMetadata) { assertThat(inFile.name(), equalTo(outFiles.next().name()));