From daeed6616930d86b9bf2861bac5253f4cd5b6c34 Mon Sep 17 00:00:00 2001 From: Himshikha Gupta Date: Tue, 11 Jun 2024 14:38:34 +0530 Subject: [PATCH] [Backport 2.x][Remote Routing Table] Add write flow for remote routing table (#13870) (#14160) * [Remote Routing Table] Add write flow for remote routing table (#13870) * Introduce RemoteRoutingTableService for shard routing table management Signed-off-by: Himshikha Gupta Co-authored-by: Bukhtawar Khan Co-authored-by: Arpit Bandejiya Signed-off-by: kkewwei --- CHANGELOG.md | 3 + .../routing/IndexShardRoutingTable.java | 4 +- .../InternalRemoteRoutingTableService.java | 303 +++++++++++ .../remote/NoopRemoteRoutingTableService.java | 77 +++ .../remote/RemoteRoutingTableService.java | 87 ++-- .../RemoteRoutingTableServiceFactory.java | 41 ++ .../opensearch/common/blobstore/BlobPath.java | 14 + .../common/settings/ClusterSettings.java | 3 + .../remote/ClusterMetadataManifest.java | 30 +- .../remote/RemoteClusterStateService.java | 98 +++- .../index/remote/RemoteIndexPath.java | 3 +- .../index/remote/RemoteStoreEnums.java | 69 ++- .../index/remote/RemoteStorePathStrategy.java | 129 ++++- .../routing/IndexShardRoutingTableTests.java | 6 + ...RemoteRoutingTableServiceFactoryTests.java | 51 ++ .../RemoteRoutingTableServiceTests.java | 484 +++++++++++++++++- .../remote/ClusterMetadataManifestTests.java | 10 +- .../RemoteClusterStateServiceTests.java | 177 ++++++- .../index/remote/RemoteStoreEnumsTests.java | 76 +-- .../remote/RemoteStorePathStrategyTests.java | 87 ++++ 20 files changed, 1535 insertions(+), 217 deletions(-) create mode 100644 server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java create mode 100644 server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java create mode 100644 server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java create mode 100644 server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactoryTests.java create mode 100644 server/src/test/java/org/opensearch/index/remote/RemoteStorePathStrategyTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index aacdc38089f3c..e3505513d8453 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Allow setting query parameters on requests ([#13776](https://github.com/opensearch-project/OpenSearch/issues/13776)) - Add dynamic action retry timeout setting ([#14022](https://github.com/opensearch-project/OpenSearch/issues/14022)) - Add capability to disable source recovery_source for an index ([#13590](https://github.com/opensearch-project/OpenSearch/pull/13590)) +- Add remote routing table for remote state publication with experimental feature flag ([#13304](https://github.com/opensearch-project/OpenSearch/pull/13304)) +- Add upload flow for writing routing table to remote store ([#13870](https://github.com/opensearch-project/OpenSearch/pull/13870)) +- Add dynamic action retry timeout setting ([#14022](https://github.com/opensearch-project/OpenSearch/issues/14022)) - [Remote Store] Add support to disable flush based on translog reader count ([#14027](https://github.com/opensearch-project/OpenSearch/pull/14027)) - Add recovery chunk size setting ([#13997](https://github.com/opensearch-project/OpenSearch/pull/13997)) - [Query Insights] Add exporter support for top n queries ([#12982](https://github.com/opensearch-project/OpenSearch/pull/12982)) diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java index fd8cbea42c12f..479143fa9a2f0 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java @@ -738,9 +738,7 @@ public boolean equals(Object o) { IndexShardRoutingTable that = (IndexShardRoutingTable) o; if (!shardId.equals(that.shardId)) return false; - if (!shards.equals(that.shards)) return false; - - return true; + return shards.size() == that.shards.size() && shards.containsAll(that.shards) && that.shards.containsAll(shards); } @Override diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java new file mode 100644 index 0000000000000..01cd5c0b89a7d --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java @@ -0,0 +1,303 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.remote; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.IndexInput; +import org.opensearch.action.LatchedActionListener; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.DiffableUtils; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.common.CheckedRunnable; +import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; +import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.common.lucene.store.ByteArrayIndexInput; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.io.IOUtils; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.RemoteClusterStateService; +import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable; +import org.opensearch.index.remote.RemoteStoreEnums; +import org.opensearch.index.remote.RemoteStorePathStrategy; +import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.node.Node; +import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.Repository; +import org.opensearch.repositories.blobstore.BlobStoreRepository; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled; + +/** + * A Service which provides APIs to upload and download routing table from remote store. + * + * @opensearch.internal + */ +public class InternalRemoteRoutingTableService extends AbstractLifecycleComponent implements RemoteRoutingTableService { + + /** + * This setting is used to set the remote routing table store blob store path type strategy. + */ + public static final Setting REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING = new Setting<>( + "cluster.remote_store.routing_table.path_type", + RemoteStoreEnums.PathType.HASHED_PREFIX.toString(), + RemoteStoreEnums.PathType::parseString, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + /** + * This setting is used to set the remote routing table store blob store path hash algorithm strategy. + * This setting will come to effect if the {@link #REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING} + * is either {@code HASHED_PREFIX} or {@code HASHED_INFIX}. + */ + public static final Setting REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING = new Setting<>( + "cluster.remote_store.routing_table.path_hash_algo", + RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64.toString(), + RemoteStoreEnums.PathHashAlgorithm::parseString, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + public static final String INDEX_ROUTING_PATH_TOKEN = "index-routing"; + public static final String INDEX_ROUTING_FILE_PREFIX = "index_routing"; + public static final String DELIMITER = "__"; + public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--"; + + private static final Logger logger = LogManager.getLogger(InternalRemoteRoutingTableService.class); + private final Settings settings; + private final Supplier repositoriesService; + private BlobStoreRepository blobStoreRepository; + private RemoteStoreEnums.PathType pathType; + private RemoteStoreEnums.PathHashAlgorithm pathHashAlgo; + + public InternalRemoteRoutingTableService( + Supplier repositoriesService, + Settings settings, + ClusterSettings clusterSettings + ) { + assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled"; + this.repositoriesService = repositoriesService; + this.settings = settings; + this.pathType = clusterSettings.get(REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING); + this.pathHashAlgo = clusterSettings.get(REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING); + clusterSettings.addSettingsUpdateConsumer(REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING, this::setPathTypeSetting); + clusterSettings.addSettingsUpdateConsumer(REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING, this::setPathHashAlgoSetting); + } + + private void setPathTypeSetting(RemoteStoreEnums.PathType pathType) { + this.pathType = pathType; + } + + private void setPathHashAlgoSetting(RemoteStoreEnums.PathHashAlgorithm pathHashAlgo) { + this.pathHashAlgo = pathHashAlgo; + } + + public List getIndicesRouting(RoutingTable routingTable) { + return new ArrayList<>(routingTable.indicesRouting().values()); + } + + /** + * Returns diff between the two routing tables, which includes upserts and deletes. + * @param before previous routing table + * @param after current routing table + * @return diff of the previous and current routing table + */ + public DiffableUtils.MapDiff> getIndicesRoutingMapDiff( + RoutingTable before, + RoutingTable after + ) { + return DiffableUtils.diff( + before.getIndicesRouting(), + after.getIndicesRouting(), + DiffableUtils.getStringKeySerializer(), + CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER + ); + } + + /** + * Create async action for writing one {@code IndexRoutingTable} to remote store + * @param clusterState current cluster state + * @param indexRouting indexRoutingTable to write to remote store + * @param latchedActionListener listener for handling async action response + * @param clusterBasePath base path for remote file + * @return returns runnable async action + */ + public CheckedRunnable getIndexRoutingAsyncAction( + ClusterState clusterState, + IndexRoutingTable indexRouting, + LatchedActionListener latchedActionListener, + BlobPath clusterBasePath + ) { + + BlobPath indexRoutingPath = clusterBasePath.add(INDEX_ROUTING_PATH_TOKEN); + BlobPath path = pathType.path( + RemoteStorePathStrategy.BasePathInput.builder().basePath(indexRoutingPath).indexUUID(indexRouting.getIndex().getUUID()).build(), + pathHashAlgo + ); + final BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer(path); + + final String fileName = getIndexRoutingFileName(clusterState.term(), clusterState.version()); + + ActionListener completionListener = ActionListener.wrap( + resp -> latchedActionListener.onResponse( + new ClusterMetadataManifest.UploadedIndexMetadata( + indexRouting.getIndex().getName(), + indexRouting.getIndex().getUUID(), + path.buildAsString() + fileName, + INDEX_ROUTING_METADATA_PREFIX + ) + ), + ex -> latchedActionListener.onFailure( + new RemoteClusterStateService.RemoteStateTransferException( + "Exception in writing index to remote store: " + indexRouting.getIndex().toString(), + ex + ) + ) + ); + + return () -> uploadIndex(indexRouting, fileName, blobContainer, completionListener); + } + + /** + * Combines IndicesRoutingMetadata from previous manifest and current uploaded indices, removes deleted indices. + * @param previousManifest previous manifest, used to get all existing indices routing paths + * @param indicesRoutingUploaded current uploaded indices routings + * @param indicesRoutingToDelete indices to delete + * @return combined list of metadata + */ + public List getAllUploadedIndicesRouting( + ClusterMetadataManifest previousManifest, + List indicesRoutingUploaded, + List indicesRoutingToDelete + ) { + final Map allUploadedIndicesRouting = previousManifest.getIndicesRouting() + .stream() + .collect(Collectors.toMap(ClusterMetadataManifest.UploadedIndexMetadata::getIndexName, Function.identity())); + + indicesRoutingUploaded.forEach( + uploadedIndexRouting -> allUploadedIndicesRouting.put(uploadedIndexRouting.getIndexName(), uploadedIndexRouting) + ); + indicesRoutingToDelete.forEach(allUploadedIndicesRouting::remove); + + return new ArrayList<>(allUploadedIndicesRouting.values()); + } + + private void uploadIndex( + IndexRoutingTable indexRouting, + String fileName, + BlobContainer blobContainer, + ActionListener completionListener + ) { + RemoteIndexRoutingTable indexRoutingInput = new RemoteIndexRoutingTable(indexRouting); + BytesReference bytesInput = null; + try (BytesStreamOutput streamOutput = new BytesStreamOutput()) { + indexRoutingInput.writeTo(streamOutput); + bytesInput = streamOutput.bytes(); + } catch (IOException e) { + logger.error("Failed to serialize IndexRoutingTable for [{}]: [{}]", indexRouting, e); + completionListener.onFailure(e); + return; + } + + if (blobContainer instanceof AsyncMultiStreamBlobContainer == false) { + try { + blobContainer.writeBlob(fileName, bytesInput.streamInput(), bytesInput.length(), true); + completionListener.onResponse(null); + } catch (IOException e) { + logger.error("Failed to write IndexRoutingTable to remote store for indexRouting [{}]: [{}]", indexRouting, e); + completionListener.onFailure(e); + } + return; + } + + try (IndexInput input = new ByteArrayIndexInput("indexrouting", BytesReference.toBytes(bytesInput))) { + try ( + RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( + fileName, + fileName, + input.length(), + true, + WritePriority.URGENT, + (size, position) -> new OffsetRangeIndexInputStream(input, size, position), + null, + false + ) + ) { + ((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload( + remoteTransferContainer.createWriteContext(), + completionListener + ); + } catch (IOException e) { + logger.error("Failed to write IndexRoutingTable to remote store for indexRouting [{}]: [{}]", indexRouting, e); + completionListener.onFailure(e); + } + } catch (IOException e) { + logger.error( + "Failed to create transfer object for IndexRoutingTable for remote store upload for indexRouting [{}]: [{}]", + indexRouting, + e + ); + completionListener.onFailure(e); + } + } + + private String getIndexRoutingFileName(long term, long version) { + return String.join( + DELIMITER, + INDEX_ROUTING_FILE_PREFIX, + RemoteStoreUtils.invertLong(term), + RemoteStoreUtils.invertLong(version), + RemoteStoreUtils.invertLong(System.currentTimeMillis()) + ); + } + + @Override + protected void doClose() throws IOException { + if (blobStoreRepository != null) { + IOUtils.close(blobStoreRepository); + } + } + + @Override + protected void doStart() { + assert isRemoteRoutingTableEnabled(settings) == true : "Remote routing table is not enabled"; + final String remoteStoreRepo = settings.get( + Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY + ); + assert remoteStoreRepo != null : "Remote routing table repository is not configured"; + final Repository repository = repositoriesService.get().repository(remoteStoreRepo); + assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository"; + blobStoreRepository = (BlobStoreRepository) repository; + } + + @Override + protected void doStop() {} + +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java new file mode 100644 index 0000000000000..b52c00f1f8576 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java @@ -0,0 +1,77 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.remote; + +import org.opensearch.action.LatchedActionListener; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.DiffableUtils; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.common.CheckedRunnable; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.gateway.remote.ClusterMetadataManifest; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * Noop impl for RemoteRoutingTableService. + */ +public class NoopRemoteRoutingTableService extends AbstractLifecycleComponent implements RemoteRoutingTableService { + + @Override + public List getIndicesRouting(RoutingTable routingTable) { + return List.of(); + } + + @Override + public DiffableUtils.MapDiff> getIndicesRoutingMapDiff( + RoutingTable before, + RoutingTable after + ) { + return DiffableUtils.diff(Map.of(), Map.of(), DiffableUtils.getStringKeySerializer(), CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER); + } + + @Override + public CheckedRunnable getIndexRoutingAsyncAction( + ClusterState clusterState, + IndexRoutingTable indexRouting, + LatchedActionListener latchedActionListener, + BlobPath clusterBasePath + ) { + // noop + return () -> {}; + } + + @Override + public List getAllUploadedIndicesRouting( + ClusterMetadataManifest previousManifest, + List indicesRoutingUploaded, + List indicesRoutingToDelete + ) { + return List.of(); + } + + @Override + protected void doStart() { + // noop + } + + @Override + protected void doStop() { + // noop + } + + @Override + protected void doClose() throws IOException { + // noop + } +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java index ba2208e17df1f..dbf01904116ed 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java @@ -8,60 +8,57 @@ package org.opensearch.cluster.routing.remote; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.common.lifecycle.AbstractLifecycleComponent; -import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.io.IOUtils; -import org.opensearch.node.Node; -import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; -import org.opensearch.repositories.RepositoriesService; -import org.opensearch.repositories.Repository; -import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.action.LatchedActionListener; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.DiffableUtils; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.common.CheckedRunnable; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.lifecycle.LifecycleComponent; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.gateway.remote.ClusterMetadataManifest; import java.io.IOException; -import java.util.function.Supplier; - -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled; +import java.util.List; +import java.util.Map; /** - * A Service which provides APIs to upload and download routing table from remote store. - * - * @opensearch.internal + * Interface for RemoteRoutingTableService. Exposes methods to orchestrate upload and download of routing table from remote store. */ -public class RemoteRoutingTableService extends AbstractLifecycleComponent { +public interface RemoteRoutingTableService extends LifecycleComponent { + static final DiffableUtils.NonDiffableValueSerializer CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER = + new DiffableUtils.NonDiffableValueSerializer() { + @Override + public void write(IndexRoutingTable value, StreamOutput out) throws IOException { + value.writeTo(out); + } - private static final Logger logger = LogManager.getLogger(RemoteRoutingTableService.class); - private final Settings settings; - private final Supplier repositoriesService; - private BlobStoreRepository blobStoreRepository; + @Override + public IndexRoutingTable read(StreamInput in, String key) throws IOException { + return IndexRoutingTable.readFrom(in); + } + }; - public RemoteRoutingTableService(Supplier repositoriesService, Settings settings) { - assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled"; - this.repositoriesService = repositoriesService; - this.settings = settings; - } + List getIndicesRouting(RoutingTable routingTable); - @Override - protected void doClose() throws IOException { - if (blobStoreRepository != null) { - IOUtils.close(blobStoreRepository); - } - } + DiffableUtils.MapDiff> getIndicesRoutingMapDiff( + RoutingTable before, + RoutingTable after + ); - @Override - protected void doStart() { - assert isRemoteRoutingTableEnabled(settings) == true : "Remote routing table is not enabled"; - final String remoteStoreRepo = settings.get( - Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY - ); - assert remoteStoreRepo != null : "Remote routing table repository is not configured"; - final Repository repository = repositoriesService.get().repository(remoteStoreRepo); - assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository"; - blobStoreRepository = (BlobStoreRepository) repository; - } + CheckedRunnable getIndexRoutingAsyncAction( + ClusterState clusterState, + IndexRoutingTable indexRouting, + LatchedActionListener latchedActionListener, + BlobPath clusterBasePath + ); - @Override - protected void doStop() {} + List getAllUploadedIndicesRouting( + ClusterMetadataManifest previousManifest, + List indicesRoutingUploaded, + List indicesRoutingToDelete + ); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java new file mode 100644 index 0000000000000..49f90fa261f27 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.remote; + +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.repositories.RepositoriesService; + +import java.util.function.Supplier; + +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled; + +/** + * Factory to provide impl for RemoteRoutingTableService based on settings. + */ +public class RemoteRoutingTableServiceFactory { + + /** + * Returns {@code DefaultRemoteRoutingTableService} if the feature is enabled, otherwise {@code NoopRemoteRoutingTableService} + * @param repositoriesService repositoriesService + * @param settings settings + * @param clusterSettings clusterSettings + * @return RemoteRoutingTableService + */ + public static RemoteRoutingTableService getService( + Supplier repositoriesService, + Settings settings, + ClusterSettings clusterSettings + ) { + if (isRemoteRoutingTableEnabled(settings)) { + return new InternalRemoteRoutingTableService(repositoriesService, settings, clusterSettings); + } + return new NoopRemoteRoutingTableService(); + } +} diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobPath.java b/server/src/main/java/org/opensearch/common/blobstore/BlobPath.java index d64f47516e094..076c49ffa6d45 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/BlobPath.java +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobPath.java @@ -39,6 +39,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Objects; /** * The list of paths where a blob can reside. The contents of the paths are dependent upon the implementation of {@link BlobContainer}. @@ -110,6 +111,19 @@ public BlobPath parent() { } } + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + BlobPath that = (BlobPath) o; + return Objects.equals(paths, that.paths); + } + + @Override + public int hashCode() { + return Objects.hashCode(paths); + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 9f08d5ec442ac..25288bcb4d268 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -78,6 +78,7 @@ import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; +import org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService; import org.opensearch.cluster.service.ClusterApplierService; import org.opensearch.cluster.service.ClusterManagerService; import org.opensearch.cluster.service.ClusterManagerTaskThrottler; @@ -726,6 +727,8 @@ public void apply(Settings value, Settings current, Settings previous) { IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING, IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING, + InternalRemoteRoutingTableService.REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING, + InternalRemoteRoutingTableService.REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING, AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE, CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java index 90c80ccdf78a1..1f588d8ba70ae 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java @@ -812,6 +812,7 @@ public static class UploadedIndexMetadata implements UploadedMetadata, Writeable private static final ParseField INDEX_NAME_FIELD = new ParseField("index_name"); private static final ParseField INDEX_UUID_FIELD = new ParseField("index_uuid"); private static final ParseField UPLOADED_FILENAME_FIELD = new ParseField("uploaded_filename"); + private static final ParseField COMPONENT_PREFIX_FIELD = new ParseField("component_prefix"); private static String indexName(Object[] fields) { return (String) fields[0]; @@ -825,23 +826,34 @@ private static String uploadedFilename(Object[] fields) { return (String) fields[2]; } + private static String componentPrefix(Object[] fields) { + return (String) fields[3]; + } + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "uploaded_index_metadata", - fields -> new UploadedIndexMetadata(indexName(fields), indexUUID(fields), uploadedFilename(fields)) + fields -> new UploadedIndexMetadata(indexName(fields), indexUUID(fields), uploadedFilename(fields), componentPrefix(fields)) ); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), INDEX_NAME_FIELD); PARSER.declareString(ConstructingObjectParser.constructorArg(), INDEX_UUID_FIELD); PARSER.declareString(ConstructingObjectParser.constructorArg(), UPLOADED_FILENAME_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), COMPONENT_PREFIX_FIELD); } static final String COMPONENT_PREFIX = "index--"; + private final String componentPrefix; private final String indexName; private final String indexUUID; private final String uploadedFilename; public UploadedIndexMetadata(String indexName, String indexUUID, String uploadedFileName) { + this(indexName, indexUUID, uploadedFileName, COMPONENT_PREFIX); + } + + public UploadedIndexMetadata(String indexName, String indexUUID, String uploadedFileName, String componentPrefix) { + this.componentPrefix = componentPrefix; this.indexName = indexName; this.indexUUID = indexUUID; this.uploadedFilename = uploadedFileName; @@ -851,6 +863,7 @@ public UploadedIndexMetadata(StreamInput in) throws IOException { this.indexName = in.readString(); this.indexUUID = in.readString(); this.uploadedFilename = in.readString(); + this.componentPrefix = in.readString(); } public String getUploadedFilePath() { @@ -859,7 +872,7 @@ public String getUploadedFilePath() { @Override public String getComponent() { - return COMPONENT_PREFIX + getIndexName(); + return componentPrefix + getIndexName(); } public String getUploadedFilename() { @@ -875,11 +888,16 @@ public String getIndexUUID() { return indexUUID; } + public String getComponentPrefix() { + return componentPrefix; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { return builder.field(INDEX_NAME_FIELD.getPreferredName(), getIndexName()) .field(INDEX_UUID_FIELD.getPreferredName(), getIndexUUID()) - .field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilePath()); + .field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilePath()) + .field(COMPONENT_PREFIX_FIELD.getPreferredName(), getComponentPrefix()); } @Override @@ -887,6 +905,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(indexName); out.writeString(indexUUID); out.writeString(uploadedFilename); + out.writeString(componentPrefix); } @Override @@ -900,12 +919,13 @@ public boolean equals(Object o) { final UploadedIndexMetadata that = (UploadedIndexMetadata) o; return Objects.equals(indexName, that.indexName) && Objects.equals(indexUUID, that.indexUUID) - && Objects.equals(uploadedFilename, that.uploadedFilename); + && Objects.equals(uploadedFilename, that.uploadedFilename) + && Objects.equals(componentPrefix, that.componentPrefix); } @Override public int hashCode() { - return Objects.hash(indexName, indexUUID, uploadedFilename); + return Objects.hash(indexName, indexUUID, uploadedFilename, componentPrefix); } @Override diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 240b00e67af0c..81950cb3bcf30 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -14,11 +14,15 @@ import org.opensearch.Version; import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.DiffableUtils; import org.opensearch.cluster.coordination.CoordinationMetadata; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.TemplatesMetadata; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService; import org.opensearch.cluster.routing.remote.RemoteRoutingTableService; +import org.opensearch.cluster.routing.remote.RemoteRoutingTableServiceFactory; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.CheckedRunnable; import org.opensearch.common.Nullable; @@ -71,7 +75,6 @@ import static java.util.Objects.requireNonNull; import static org.opensearch.gateway.PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; /** @@ -168,6 +171,12 @@ public class RemoteClusterStateService implements Closeable { /** * Manifest format compatible with codec v2, where global metadata file is replaced with multiple metadata attribute files */ + public static final ChecksumBlobStoreFormat CLUSTER_METADATA_MANIFEST_FORMAT_V2 = + new ChecksumBlobStoreFormat<>("cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, ClusterMetadataManifest::fromXContentV2); + + /** + * Manifest format compatible with codec v3, where global metadata file is replaced with multiple metadata attribute files + */ public static final ChecksumBlobStoreFormat CLUSTER_METADATA_MANIFEST_FORMAT = new ChecksumBlobStoreFormat<>( "cluster-metadata-manifest", METADATA_MANIFEST_NAME_FORMAT, @@ -204,7 +213,7 @@ public class RemoteClusterStateService implements Closeable { private final List indexMetadataUploadListeners; private BlobStoreRepository blobStoreRepository; private BlobStoreTransferService blobStoreTransferService; - private Optional remoteRoutingTableService; + private final RemoteRoutingTableService remoteRoutingTableService; private volatile TimeValue slowWriteLoggingThreshold; private volatile TimeValue indexMetadataUploadTimeout; @@ -215,7 +224,7 @@ public class RemoteClusterStateService implements Closeable { private final String CLUSTER_STATE_UPLOAD_TIME_LOG_STRING = "writing cluster state for version [{}] took [{}ms]"; private final String METADATA_UPDATE_LOG_STRING = "wrote metadata for [{}] indices and skipped [{}] unchanged " + "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata " - + "updated : [{}], custom metadata updated : [{}]"; + + "updated : [{}], custom metadata updated : [{}], indices routing updated : [{}]"; public static final int INDEX_METADATA_CURRENT_CODEC_VERSION = 1; public static final int MANIFEST_CURRENT_CODEC_VERSION = ClusterMetadataManifest.CODEC_V3; public static final int GLOBAL_METADATA_CURRENT_CODEC_VERSION = 2; @@ -257,9 +266,7 @@ public RemoteClusterStateService( this.remoteStateStats = new RemotePersistenceStats(); this.remoteClusterStateCleanupManager = new RemoteClusterStateCleanupManager(this, clusterService); this.indexMetadataUploadListeners = indexMetadataUploadListeners; - this.remoteRoutingTableService = isRemoteRoutingTableEnabled(settings) - ? Optional.of(new RemoteRoutingTableService(repositoriesService, settings)) - : Optional.empty(); + this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService(repositoriesService, settings, clusterSettings); } /** @@ -283,7 +290,8 @@ public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterStat clusterState.metadata().customs(), true, true, - true + true, + remoteRoutingTableService.getIndicesRouting(clusterState.getRoutingTable()) ); final RemoteClusterStateManifestInfo manifestDetails = uploadManifest( clusterState, @@ -293,6 +301,7 @@ public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterStat uploadedMetadataResults.uploadedSettingsMetadata, uploadedMetadataResults.uploadedTemplatesMetadata, uploadedMetadataResults.uploadedCustomMetadataMap, + uploadedMetadataResults.uploadedIndicesRoutingMetadata, false ); final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); @@ -300,16 +309,19 @@ public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterStat remoteStateStats.stateTook(durationMillis); if (durationMillis >= slowWriteLoggingThreshold.getMillis()) { logger.warn( - "writing cluster state took [{}ms] which is above the warn threshold of [{}]; " + "wrote full state with [{}] indices", + "writing cluster state took [{}ms] which is above the warn threshold of [{}]; " + + "wrote full state with [{}] indices and [{}] indicesRouting", durationMillis, slowWriteLoggingThreshold, - uploadedMetadataResults.uploadedIndexMetadata.size() + uploadedMetadataResults.uploadedIndexMetadata.size(), + uploadedMetadataResults.uploadedIndicesRoutingMetadata.size() ); } else { logger.info( - "writing cluster state took [{}ms]; " + "wrote full state with [{}] indices and global metadata", + "writing cluster state took [{}ms]; " + "wrote full state with [{}] indices, [{}] indicesRouting and global metadata", durationMillis, - uploadedMetadataResults.uploadedIndexMetadata.size() + uploadedMetadataResults.uploadedIndexMetadata.size(), + uploadedMetadataResults.uploadedIndicesRoutingMetadata.size() ); } return manifestDetails; @@ -374,6 +386,12 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( // index present in current cluster state indicesToBeDeletedFromRemote.remove(indexMetadata.getIndex().getName()); } + + DiffableUtils.MapDiff> routingTableDiff = remoteRoutingTableService + .getIndicesRoutingMapDiff(previousClusterState.getRoutingTable(), clusterState.getRoutingTable()); + List indicesRoutingToUpload = new ArrayList<>(); + routingTableDiff.getUpserts().forEach((k, v) -> indicesRoutingToUpload.add(v)); + UploadedMetadataResults uploadedMetadataResults; // For migration case from codec V0 or V1 to V2, we have added null check on metadata attribute files, // If file is empty and codec is 1 then write global metadata. @@ -393,7 +411,8 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( firstUploadForSplitGlobalMetadata ? clusterState.metadata().customs() : customsToUpload, updateCoordinationMetadata, updateSettingsMetadata, - updateTemplatesMetadata + updateTemplatesMetadata, + indicesRoutingToUpload ); // update the map if the metadata was uploaded @@ -405,6 +424,13 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( customsToBeDeletedFromRemote.keySet().forEach(allUploadedCustomMap::remove); indicesToBeDeletedFromRemote.keySet().forEach(allUploadedIndexMetadata::remove); + List allUploadedIndicesRouting = new ArrayList<>(); + allUploadedIndicesRouting = remoteRoutingTableService.getAllUploadedIndicesRouting( + previousManifest, + uploadedMetadataResults.uploadedIndicesRoutingMetadata, + routingTableDiff.getDeletes() + ); + final RemoteClusterStateManifestInfo manifestDetails = uploadManifest( clusterState, new ArrayList<>(allUploadedIndexMetadata.values()), @@ -415,6 +441,7 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( firstUploadForSplitGlobalMetadata || !customsToUpload.isEmpty() ? allUploadedCustomMap : previousManifest.getCustomMetadataMap(), + allUploadedIndicesRouting, false ); @@ -433,7 +460,8 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( updateCoordinationMetadata, updateSettingsMetadata, updateTemplatesMetadata, - customsToUpload.size() + customsToUpload.size(), + indicesRoutingToUpload.size() ); if (durationMillis >= slowWriteLoggingThreshold.getMillis()) { logger.warn( @@ -455,11 +483,13 @@ private UploadedMetadataResults writeMetadataInParallel( Map customToUpload, boolean uploadCoordinationMetadata, boolean uploadSettingsMetadata, - boolean uploadTemplateMetadata + boolean uploadTemplateMetadata, + List indicesRoutingToUpload ) throws IOException { assert Objects.nonNull(indexMetadataUploadListeners) : "indexMetadataUploadListeners can not be null"; int totalUploadTasks = indexToUpload.size() + indexMetadataUploadListeners.size() + customToUpload.size() - + (uploadCoordinationMetadata ? 1 : 0) + (uploadSettingsMetadata ? 1 : 0) + (uploadTemplateMetadata ? 1 : 0); + + (uploadCoordinationMetadata ? 1 : 0) + (uploadSettingsMetadata ? 1 : 0) + (uploadTemplateMetadata ? 1 : 0) + + indicesRoutingToUpload.size(); CountDownLatch latch = new CountDownLatch(totalUploadTasks); Map> uploadTasks = new HashMap<>(totalUploadTasks); Map results = new HashMap<>(totalUploadTasks); @@ -526,6 +556,18 @@ private UploadedMetadataResults writeMetadataInParallel( uploadTasks.put(indexMetadata.getIndex().getName(), getIndexMetadataAsyncAction(clusterState, indexMetadata, listener)); }); + indicesRoutingToUpload.forEach(indexRoutingTable -> { + uploadTasks.put( + InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + indexRoutingTable.getIndex().getName(), + remoteRoutingTableService.getIndexRoutingAsyncAction( + clusterState, + indexRoutingTable, + listener, + getCusterMetadataBasePath(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID()) + ) + ); + }); + // start async upload of all required metadata files for (CheckedRunnable uploadTask : uploadTasks.values()) { uploadTask.run(); @@ -572,7 +614,10 @@ private UploadedMetadataResults writeMetadataInParallel( } UploadedMetadataResults response = new UploadedMetadataResults(); results.forEach((name, uploadedMetadata) -> { - if (name.contains(CUSTOM_METADATA)) { + if (uploadedMetadata.getClass().equals(UploadedIndexMetadata.class) + && uploadedMetadata.getComponent().contains(InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX)) { + response.uploadedIndicesRoutingMetadata.add((UploadedIndexMetadata) uploadedMetadata); + } else if (name.contains(CUSTOM_METADATA)) { // component name for custom metadata will look like custom-- String custom = name.split(DELIMITER)[0].split(CUSTOM_DELIMITER)[1]; response.uploadedCustomMetadataMap.put( @@ -741,6 +786,7 @@ public RemoteClusterStateManifestInfo markLastStateAsCommitted(ClusterState clus previousManifest.getSettingsMetadata(), previousManifest.getTemplatesMetadata(), previousManifest.getCustomMetadataMap(), + previousManifest.getIndicesRouting(), true ); if (!previousManifest.isClusterUUIDCommitted() && committedManifestDetails.getClusterMetadataManifest().isClusterUUIDCommitted()) { @@ -756,9 +802,7 @@ public void close() throws IOException { if (blobStoreRepository != null) { IOUtils.close(blobStoreRepository); } - if (this.remoteRoutingTableService.isPresent()) { - this.remoteRoutingTableService.get().close(); - } + this.remoteRoutingTableService.close(); } public void start() { @@ -771,7 +815,7 @@ public void start() { assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository"; blobStoreRepository = (BlobStoreRepository) repository; remoteClusterStateCleanupManager.start(); - this.remoteRoutingTableService.ifPresent(RemoteRoutingTableService::start); + this.remoteRoutingTableService.start(); } private RemoteClusterStateManifestInfo uploadManifest( @@ -782,6 +826,7 @@ private RemoteClusterStateManifestInfo uploadManifest( UploadedMetadataAttribute uploadedSettingsMetadata, UploadedMetadataAttribute uploadedTemplatesMetadata, Map uploadedCustomMetadataMap, + List uploadedIndicesRouting, boolean committed ) throws IOException { synchronized (this) { @@ -809,8 +854,7 @@ private RemoteClusterStateManifestInfo uploadManifest( uploadedTemplatesMetadata, uploadedCustomMetadataMap, clusterState.routingTable().version(), - // TODO: Add actual list of changed indices routing with index routing upload flow. - new ArrayList<>() + uploadedIndicesRouting ); writeMetadataManifest(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), manifest, manifestFileName); return new RemoteClusterStateManifestInfo(manifest, manifestFileName); @@ -957,7 +1001,7 @@ public TimeValue getMetadataManifestUploadTimeout() { } // Package private for unit test - Optional getRemoteRoutingTableService() { + RemoteRoutingTableService getRemoteRoutingTableService() { return this.remoteRoutingTableService; } @@ -1496,6 +1540,8 @@ private ChecksumBlobStoreFormat getClusterMetadataManif long codecVersion = getManifestCodecVersion(fileName); if (codecVersion == MANIFEST_CURRENT_CODEC_VERSION) { return CLUSTER_METADATA_MANIFEST_FORMAT; + } else if (codecVersion == ClusterMetadataManifest.CODEC_V2) { + return CLUSTER_METADATA_MANIFEST_FORMAT_V2; } else if (codecVersion == ClusterMetadataManifest.CODEC_V1) { return CLUSTER_METADATA_MANIFEST_FORMAT_V1; } else if (codecVersion == ClusterMetadataManifest.CODEC_V0) { @@ -1549,19 +1595,22 @@ private static class UploadedMetadataResults { UploadedMetadataAttribute uploadedCoordinationMetadata; UploadedMetadataAttribute uploadedSettingsMetadata; UploadedMetadataAttribute uploadedTemplatesMetadata; + List uploadedIndicesRoutingMetadata; public UploadedMetadataResults( List uploadedIndexMetadata, Map uploadedCustomMetadataMap, UploadedMetadataAttribute uploadedCoordinationMetadata, UploadedMetadataAttribute uploadedSettingsMetadata, - UploadedMetadataAttribute uploadedTemplatesMetadata + UploadedMetadataAttribute uploadedTemplatesMetadata, + List uploadedIndicesRoutingMetadata ) { this.uploadedIndexMetadata = uploadedIndexMetadata; this.uploadedCustomMetadataMap = uploadedCustomMetadataMap; this.uploadedCoordinationMetadata = uploadedCoordinationMetadata; this.uploadedSettingsMetadata = uploadedSettingsMetadata; this.uploadedTemplatesMetadata = uploadedTemplatesMetadata; + this.uploadedIndicesRoutingMetadata = uploadedIndicesRoutingMetadata; } public UploadedMetadataResults() { @@ -1570,6 +1619,7 @@ public UploadedMetadataResults() { this.uploadedCoordinationMetadata = null; this.uploadedSettingsMetadata = null; this.uploadedTemplatesMetadata = null; + this.uploadedIndicesRoutingMetadata = new ArrayList<>(); } } } diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteIndexPath.java b/server/src/main/java/org/opensearch/index/remote/RemoteIndexPath.java index 89b642b79df86..dfa5b7afc9c25 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteIndexPath.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteIndexPath.java @@ -18,6 +18,7 @@ import org.opensearch.index.remote.RemoteStoreEnums.DataType; import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm; import org.opensearch.index.remote.RemoteStoreEnums.PathType; +import org.opensearch.index.remote.RemoteStorePathStrategy.BasePathInput; import org.opensearch.index.remote.RemoteStorePathStrategy.PathInput; import java.io.IOException; @@ -141,7 +142,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws DataCategory dataCategory = entry.getKey(); for (DataType type : entry.getValue()) { for (int shardNo = 0; shardNo < shardCount; shardNo++) { - PathInput pathInput = PathInput.builder() + BasePathInput pathInput = PathInput.builder() .basePath(new BlobPath().add(basePath)) .indexUUID(indexUUID) .shardId(Integer.toString(shardNo)) diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStoreEnums.java b/server/src/main/java/org/opensearch/index/remote/RemoteStoreEnums.java index c1ac74724e405..6118650f1924d 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteStoreEnums.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStoreEnums.java @@ -13,7 +13,7 @@ import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.hash.FNV1a; -import org.opensearch.index.remote.RemoteStorePathStrategy.PathInput; +import org.opensearch.index.remote.RemoteStorePathStrategy.BasePathInput; import java.util.HashMap; import java.util.Locale; @@ -92,14 +92,10 @@ public String getName() { public enum PathType { FIXED(0) { @Override - public BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm) { + public BlobPath generatePath(BasePathInput pathInput, PathHashAlgorithm hashAlgorithm) { assert Objects.isNull(hashAlgorithm) : "hashAlgorithm is expected to be null with fixed remote store path type"; // Hash algorithm is not used in FIXED path type - return pathInput.basePath() - .add(pathInput.indexUUID()) - .add(pathInput.shardId()) - .add(pathInput.dataCategory().getName()) - .add(pathInput.dataType().getName()); + return pathInput.basePath().add(pathInput.fixedSubPath()); } @Override @@ -109,15 +105,9 @@ boolean requiresHashAlgorithm() { }, HASHED_PREFIX(1) { @Override - public BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm) { + public BlobPath generatePath(BasePathInput pathInput, PathHashAlgorithm hashAlgorithm) { assert Objects.nonNull(hashAlgorithm) : "hashAlgorithm is expected to be non-null"; - return BlobPath.cleanPath() - .add(hashAlgorithm.hash(pathInput)) - .add(pathInput.basePath()) - .add(pathInput.indexUUID()) - .add(pathInput.shardId()) - .add(pathInput.dataCategory().getName()) - .add(pathInput.dataType().getName()); + return BlobPath.cleanPath().add(hashAlgorithm.hash(pathInput)).add(pathInput.basePath()).add(pathInput.fixedSubPath()); } @Override @@ -127,14 +117,9 @@ boolean requiresHashAlgorithm() { }, HASHED_INFIX(2) { @Override - public BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm) { + public BlobPath generatePath(BasePathInput pathInput, PathHashAlgorithm hashAlgorithm) { assert Objects.nonNull(hashAlgorithm) : "hashAlgorithm is expected to be non-null"; - return pathInput.basePath() - .add(hashAlgorithm.hash(pathInput)) - .add(pathInput.indexUUID()) - .add(pathInput.shardId()) - .add(pathInput.dataCategory().getName()) - .add(pathInput.dataType().getName()); + return pathInput.basePath().add(hashAlgorithm.hash(pathInput)).add(pathInput.fixedSubPath()); } @Override @@ -185,18 +170,18 @@ public static PathType fromCode(int code) { * @param hashAlgorithm hashing algorithm. * @return the blob path for the path input. */ - public BlobPath path(PathInput pathInput, PathHashAlgorithm hashAlgorithm) { - DataCategory dataCategory = pathInput.dataCategory(); - DataType dataType = pathInput.dataType(); - assert dataCategory.isSupportedDataType(dataType) : "category:" - + dataCategory - + " type:" - + dataType - + " are not supported together"; + public BlobPath path(BasePathInput pathInput, PathHashAlgorithm hashAlgorithm) { + pathInput.assertIsValid(); return generatePath(pathInput, hashAlgorithm); } - protected abstract BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm); + // Added for BWC + public BlobPath path(RemoteStorePathStrategy.PathInput pathInput, PathHashAlgorithm hashAlgorithm) { + pathInput.assertIsValid(); + return generatePath(pathInput, hashAlgorithm); + } + + protected abstract BlobPath generatePath(BasePathInput pathInput, PathHashAlgorithm hashAlgorithm); abstract boolean requiresHashAlgorithm(); @@ -226,10 +211,12 @@ public enum PathHashAlgorithm { FNV_1A_BASE64(0) { @Override - String hash(PathInput pathInput) { - String input = pathInput.indexUUID() + pathInput.shardId() + pathInput.dataCategory().getName() + pathInput.dataType() - .getName(); - long hash = FNV1a.hash64(input); + String hash(BasePathInput pathInput) { + StringBuilder input = new StringBuilder(); + for (String path : pathInput.fixedSubPath().toArray()) { + input.append(path); + } + long hash = FNV1a.hash64(input.toString()); return longToUrlBase64(hash); } }, @@ -239,10 +226,12 @@ String hash(PathInput pathInput) { */ FNV_1A_COMPOSITE_1(1) { @Override - String hash(PathInput pathInput) { - String input = pathInput.indexUUID() + pathInput.shardId() + pathInput.dataCategory().getName() + pathInput.dataType() - .getName(); - long hash = FNV1a.hash64(input); + String hash(BasePathInput pathInput) { + StringBuilder input = new StringBuilder(); + for (String path : pathInput.fixedSubPath().toArray()) { + input.append(path); + } + long hash = FNV1a.hash64(input.toString()); return longToCompositeBase64AndBinaryEncoding(hash, 20); } }; @@ -281,7 +270,7 @@ public static PathHashAlgorithm fromCode(int code) { return CODE_TO_ENUM.get(code); } - abstract String hash(PathInput pathInput); + abstract String hash(BasePathInput pathInput); public static PathHashAlgorithm parseString(String pathHashAlgorithm) { try { diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStorePathStrategy.java b/server/src/main/java/org/opensearch/index/remote/RemoteStorePathStrategy.java index c58f6c3faac84..05357aaf6ec72 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteStorePathStrategy.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStorePathStrategy.java @@ -67,29 +67,36 @@ public String toString() { return "RemoteStorePathStrategy{" + "type=" + type + ", hashAlgorithm=" + hashAlgorithm + '}'; } + public BlobPath generatePath(BasePathInput pathInput) { + return type.path(pathInput, hashAlgorithm); + } + + // Added for BWC public BlobPath generatePath(PathInput pathInput) { return type.path(pathInput, hashAlgorithm); } /** - * Wrapper class for the input required to generate path for remote store uploads. + * Wrapper class for the path input required to generate path for remote store uploads. This input is composed of + * basePath and indexUUID. + * * @opensearch.internal */ @PublicApi(since = "2.14.0") @ExperimentalApi - public static class PathInput { + public static class BasePathInput { private final BlobPath basePath; private final String indexUUID; - private final String shardId; - private final DataCategory dataCategory; - private final DataType dataType; - public PathInput(BlobPath basePath, String indexUUID, String shardId, DataCategory dataCategory, DataType dataType) { - this.basePath = Objects.requireNonNull(basePath); - this.indexUUID = Objects.requireNonNull(indexUUID); - this.shardId = Objects.requireNonNull(shardId); - this.dataCategory = Objects.requireNonNull(dataCategory); - this.dataType = Objects.requireNonNull(dataType); + // Adding for BWC + public BasePathInput(BlobPath basePath, String indexUUID) { + this.basePath = basePath; + this.indexUUID = indexUUID; + } + + public BasePathInput(Builder builder) { + this.basePath = Objects.requireNonNull(builder.basePath); + this.indexUUID = Objects.requireNonNull(builder.indexUUID); } BlobPath basePath() { @@ -100,6 +107,86 @@ String indexUUID() { return indexUUID; } + BlobPath fixedSubPath() { + return BlobPath.cleanPath().add(indexUUID); + } + + /** + * Returns a new builder for {@link BasePathInput}. + */ + public static Builder builder() { + return new Builder<>(); + } + + public void assertIsValid() { + // Input is always valid here. + } + + /** + * Builder for {@link BasePathInput}. + * + * @opensearch.internal + */ + @PublicApi(since = "2.14.0") + @ExperimentalApi + public static class Builder> { + private BlobPath basePath; + private String indexUUID; + + public T basePath(BlobPath basePath) { + this.basePath = basePath; + return self(); + } + + public Builder indexUUID(String indexUUID) { + this.indexUUID = indexUUID; + return self(); + } + + protected T self() { + return (T) this; + } + + public BasePathInput build() { + return new BasePathInput(this); + } + } + } + + /** + * Wrapper class for the data aware path input required to generate path for remote store uploads. This input is + * composed of the parent inputs, shard id, data category and data type. + * + * @opensearch.internal + */ + @PublicApi(since = "2.14.0") + @ExperimentalApi + public static class PathInput extends BasePathInput { + private final String shardId; + private final DataCategory dataCategory; + private final DataType dataType; + + // Adding for BWC + public PathInput(BlobPath basePath, String indexUUID, String shardId, DataCategory dataCategory, DataType dataType) { + super(basePath, indexUUID); + this.shardId = shardId; + this.dataCategory = dataCategory; + this.dataType = dataType; + } + + public PathInput(Builder builder) { + super(builder); + this.shardId = Objects.requireNonNull(builder.shardId); + this.dataCategory = Objects.requireNonNull(builder.dataCategory); + this.dataType = Objects.requireNonNull(builder.dataType); + assert dataCategory.isSupportedDataType(dataType) : "category:" + + dataCategory + + " type:" + + dataType + + " are not supported together"; + + } + String shardId() { return shardId; } @@ -112,6 +199,11 @@ DataType dataType() { return dataType; } + @Override + BlobPath fixedSubPath() { + return super.fixedSubPath().add(shardId).add(dataCategory.getName()).add(dataType.getName()); + } + /** * Returns a new builder for {@link PathInput}. */ @@ -126,20 +218,18 @@ public static Builder builder() { */ @PublicApi(since = "2.14.0") @ExperimentalApi - public static class Builder { - private BlobPath basePath; - private String indexUUID; + public static class Builder extends BasePathInput.Builder { private String shardId; private DataCategory dataCategory; private DataType dataType; public Builder basePath(BlobPath basePath) { - this.basePath = basePath; + super.basePath = basePath; return this; } public Builder indexUUID(String indexUUID) { - this.indexUUID = indexUUID; + super.indexUUID = indexUUID; return this; } @@ -158,8 +248,13 @@ public Builder dataType(DataType dataType) { return this; } + @Override + protected Builder self() { + return this; + } + public PathInput build() { - return new PathInput(basePath, indexUUID, shardId, dataCategory, dataType); + return new PathInput(this); } } } diff --git a/server/src/test/java/org/opensearch/cluster/routing/IndexShardRoutingTableTests.java b/server/src/test/java/org/opensearch/cluster/routing/IndexShardRoutingTableTests.java index e881016fb9305..6bfe60980adf3 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/IndexShardRoutingTableTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/IndexShardRoutingTableTests.java @@ -69,6 +69,12 @@ public void testEquals() { assertNotEquals(table1, null); assertNotEquals(table1, s); assertNotEquals(table1, table3); + + ShardRouting primary = TestShardRouting.newShardRouting(shardId, "node-1", true, ShardRoutingState.STARTED); + ShardRouting replica = TestShardRouting.newShardRouting(shardId, "node-2", false, ShardRoutingState.STARTED); + IndexShardRoutingTable table4 = new IndexShardRoutingTable(shardId, Arrays.asList(primary, replica)); + IndexShardRoutingTable table5 = new IndexShardRoutingTable(shardId, Arrays.asList(replica, primary)); + assertEquals(table4, table5); } public void testShardsMatchingPredicate() { diff --git a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactoryTests.java b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactoryTests.java new file mode 100644 index 0000000000000..d0c2cca4b46f0 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactoryTests.java @@ -0,0 +1,51 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.remote; + +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.fs.FsRepository; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.function.Supplier; + +import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; + +public class RemoteRoutingTableServiceFactoryTests extends OpenSearchTestCase { + + Supplier repositoriesService; + + public void testGetServiceWhenRemoteRoutingDisabled() { + Settings settings = Settings.builder().build(); + RemoteRoutingTableService service = RemoteRoutingTableServiceFactory.getService( + repositoriesService, + settings, + new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); + assertTrue(service instanceof NoopRemoteRoutingTableService); + } + + public void testGetServiceWhenRemoteRoutingEnabled() { + Settings settings = Settings.builder() + .put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, "routing_repository") + .put(FsRepository.REPOSITORIES_COMPRESS_SETTING.getKey(), false) + .build(); + Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, "true").build(); + FeatureFlags.initializeFeatureFlags(nodeSettings); + RemoteRoutingTableService service = RemoteRoutingTableServiceFactory.getService( + repositoriesService, + settings, + new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); + assertTrue(service instanceof InternalRemoteRoutingTableService); + } +} diff --git a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java index 9a9cbfa153259..1c4b97de8b7fd 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java @@ -8,30 +8,78 @@ package org.opensearch.cluster.routing.remote; +import org.opensearch.Version; +import org.opensearch.action.LatchedActionListener; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.DiffableUtils; +import org.opensearch.cluster.coordination.CoordinationMetadata; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.common.CheckedRunnable; +import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.blobstore.stream.write.WriteContext; +import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.common.compress.DeflateCompressor; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.index.Index; +import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.RemoteClusterStateService; +import org.opensearch.index.remote.RemoteStoreEnums; +import org.opensearch.index.remote.RemoteStorePathStrategy; +import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.repositories.FilterRepository; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.RepositoryMissingException; import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.repositories.fs.FsRepository; import org.opensearch.test.OpenSearchTestCase; import org.junit.After; import org.junit.Before; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; +import org.mockito.ArgumentCaptor; + +import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.INDEX_ROUTING_FILE_PREFIX; +import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.INDEX_ROUTING_PATH_TOKEN; import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL; +import static org.opensearch.gateway.remote.RemoteClusterStateService.DELIMITER; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.startsWith; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class RemoteRoutingTableServiceTests extends OpenSearchTestCase { - private RemoteRoutingTableService remoteRoutingTableService; + private InternalRemoteRoutingTableService remoteRoutingTableService; private Supplier repositoriesServiceSupplier; private RepositoriesService repositoriesService; private BlobStoreRepository blobStoreRepository; + private BlobStore blobStore; + private BlobContainer blobContainer; + private BlobPath basePath; @Before public void setup() { @@ -41,15 +89,26 @@ public void setup() { Settings settings = Settings.builder() .put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, "routing_repository") + .put(FsRepository.REPOSITORIES_COMPRESS_SETTING.getKey(), false) .build(); blobStoreRepository = mock(BlobStoreRepository.class); + when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor()); + blobStore = mock(BlobStore.class); + blobContainer = mock(BlobContainer.class); when(repositoriesService.repository("routing_repository")).thenReturn(blobStoreRepository); + when(blobStoreRepository.blobStore()).thenReturn(blobStore); Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, "true").build(); FeatureFlags.initializeFeatureFlags(nodeSettings); - remoteRoutingTableService = new RemoteRoutingTableService(repositoriesServiceSupplier, settings); + basePath = BlobPath.cleanPath().add("base-path"); + + remoteRoutingTableService = new InternalRemoteRoutingTableService( + repositoriesServiceSupplier, + settings, + new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); } @After @@ -60,7 +119,14 @@ public void teardown() throws Exception { public void testFailInitializationWhenRemoteRoutingDisabled() { final Settings settings = Settings.builder().build(); - assertThrows(AssertionError.class, () -> new RemoteRoutingTableService(repositoriesServiceSupplier, settings)); + assertThrows( + AssertionError.class, + () -> new InternalRemoteRoutingTableService( + repositoriesServiceSupplier, + settings, + new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ) + ); } public void testFailStartWhenRepositoryNotSet() { @@ -74,4 +140,416 @@ public void testFailStartWhenNotBlobRepository() { assertThrows(AssertionError.class, () -> remoteRoutingTableService.start()); } + public void testGetIndicesRoutingMapDiff() { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + final Index index = new Index(indexName, "uuid"); + final IndexMetadata indexMetadata = new IndexMetadata.Builder(indexName).settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, "uuid") + .build() + ).numberOfShards(1).numberOfReplicas(1).build(); + + RoutingTable routingTable = RoutingTable.builder().addAsNew(indexMetadata).build(); + + DiffableUtils.MapDiff> diff = remoteRoutingTableService + .getIndicesRoutingMapDiff(routingTable, routingTable); + assertEquals(0, diff.getUpserts().size()); + assertEquals(0, diff.getDeletes().size()); + + // Reversing order to check for equality without order. + IndexRoutingTable indexRouting = routingTable.getIndicesRouting().get(indexName); + IndexRoutingTable indexRoutingTableReversed = IndexRoutingTable.builder(index) + .addShard(indexRouting.getShards().get(0).replicaShards().get(0)) + .addShard(indexRouting.getShards().get(0).primaryShard()) + .build(); + RoutingTable routingTable2 = RoutingTable.builder().add(indexRoutingTableReversed).build(); + + diff = remoteRoutingTableService.getIndicesRoutingMapDiff(routingTable, routingTable2); + assertEquals(0, diff.getUpserts().size()); + assertEquals(0, diff.getDeletes().size()); + } + + public void testGetIndicesRoutingMapDiffIndexAdded() { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + final IndexMetadata indexMetadata = new IndexMetadata.Builder(indexName).settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, "uuid") + .build() + ).numberOfShards(randomInt(1000)).numberOfReplicas(randomInt(10)).build(); + RoutingTable routingTable = RoutingTable.builder().addAsNew(indexMetadata).build(); + + String indexName2 = randomAlphaOfLength(randomIntBetween(1, 50)); + int noOfShards = randomInt(1000); + int noOfReplicas = randomInt(10); + final IndexMetadata indexMetadata2 = new IndexMetadata.Builder(indexName2).settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, "uuid2") + .build() + ).numberOfShards(noOfShards).numberOfReplicas(noOfReplicas).build(); + RoutingTable routingTable2 = RoutingTable.builder(routingTable).addAsNew(indexMetadata2).build(); + + DiffableUtils.MapDiff> diff = remoteRoutingTableService + .getIndicesRoutingMapDiff(routingTable, routingTable2); + assertEquals(1, diff.getUpserts().size()); + assertNotNull(diff.getUpserts().get(indexName2)); + assertEquals(noOfShards, diff.getUpserts().get(indexName2).getShards().size()); + + assertEquals(0, diff.getDeletes().size()); + } + + public void testGetIndicesRoutingMapDiffShardChanged() { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + final Index index = new Index(indexName, "uuid"); + int noOfShards = randomInt(1000); + int noOfReplicas = randomInt(10); + final IndexMetadata indexMetadata = new IndexMetadata.Builder(indexName).settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, "uuid") + .build() + ).numberOfShards(noOfShards).numberOfReplicas(noOfReplicas).build(); + + RoutingTable routingTable = RoutingTable.builder().addAsNew(indexMetadata).build(); + + final IndexMetadata indexMetadata2 = new IndexMetadata.Builder(indexName).settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, "uuid") + .build() + ).numberOfShards(noOfShards + 1).numberOfReplicas(noOfReplicas).build(); + RoutingTable routingTable2 = RoutingTable.builder().addAsNew(indexMetadata2).build(); + + DiffableUtils.MapDiff> diff = remoteRoutingTableService + .getIndicesRoutingMapDiff(routingTable, routingTable2); + assertEquals(1, diff.getUpserts().size()); + assertNotNull(diff.getUpserts().get(indexName)); + assertEquals(noOfShards + 1, diff.getUpserts().get(indexName).getShards().size()); + assertEquals(noOfReplicas + 1, diff.getUpserts().get(indexName).getShards().get(0).getSize()); + assertEquals(0, diff.getDeletes().size()); + + final IndexMetadata indexMetadata3 = new IndexMetadata.Builder(indexName).settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, "uuid") + .build() + ).numberOfShards(noOfShards + 1).numberOfReplicas(noOfReplicas + 1).build(); + RoutingTable routingTable3 = RoutingTable.builder().addAsNew(indexMetadata3).build(); + + diff = remoteRoutingTableService.getIndicesRoutingMapDiff(routingTable2, routingTable3); + assertEquals(1, diff.getUpserts().size()); + assertNotNull(diff.getUpserts().get(indexName)); + assertEquals(noOfShards + 1, diff.getUpserts().get(indexName).getShards().size()); + assertEquals(noOfReplicas + 2, diff.getUpserts().get(indexName).getShards().get(0).getSize()); + + assertEquals(0, diff.getDeletes().size()); + } + + public void testGetIndicesRoutingMapDiffShardDetailChanged() { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + final Index index = new Index(indexName, "uuid"); + int noOfShards = randomInt(1000); + int noOfReplicas = randomInt(10); + final IndexMetadata indexMetadata = new IndexMetadata.Builder(indexName).settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, "uuid") + .build() + ).numberOfShards(noOfShards).numberOfReplicas(noOfReplicas).build(); + + RoutingTable routingTable = RoutingTable.builder().addAsNew(indexMetadata).build(); + RoutingTable routingTable2 = RoutingTable.builder().addAsRecovery(indexMetadata).build(); + + DiffableUtils.MapDiff> diff = remoteRoutingTableService + .getIndicesRoutingMapDiff(routingTable, routingTable2); + assertEquals(1, diff.getUpserts().size()); + assertNotNull(diff.getUpserts().get(indexName)); + assertEquals(noOfShards, diff.getUpserts().get(indexName).getShards().size()); + assertEquals(noOfReplicas + 1, diff.getUpserts().get(indexName).getShards().get(0).getSize()); + assertEquals(0, diff.getDeletes().size()); + } + + public void testGetIndicesRoutingMapDiffIndexDeleted() { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + final IndexMetadata indexMetadata = new IndexMetadata.Builder(indexName).settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, "uuid") + .build() + ).numberOfShards(randomInt(1000)).numberOfReplicas(randomInt(10)).build(); + RoutingTable routingTable = RoutingTable.builder().addAsNew(indexMetadata).build(); + + String indexName2 = randomAlphaOfLength(randomIntBetween(1, 50)); + final IndexMetadata indexMetadata2 = new IndexMetadata.Builder(indexName2).settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, "uuid2") + .build() + ).numberOfShards(randomInt(1000)).numberOfReplicas(randomInt(10)).build(); + RoutingTable routingTable2 = RoutingTable.builder().addAsNew(indexMetadata2).build(); + + DiffableUtils.MapDiff> diff = remoteRoutingTableService + .getIndicesRoutingMapDiff(routingTable, routingTable2); + assertEquals(1, diff.getUpserts().size()); + assertNotNull(diff.getUpserts().get(indexName2)); + + assertEquals(1, diff.getDeletes().size()); + assertEquals(indexName, diff.getDeletes().get(0)); + } + + public void testGetIndexRoutingAsyncAction() throws IOException { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + ClusterState clusterState = createClusterState(indexName); + BlobPath expectedPath = getPath(); + + LatchedActionListener listener = mock(LatchedActionListener.class); + when(blobStore.blobContainer(expectedPath)).thenReturn(blobContainer); + + remoteRoutingTableService.start(); + CheckedRunnable runnable = remoteRoutingTableService.getIndexRoutingAsyncAction( + clusterState, + clusterState.routingTable().getIndicesRouting().get(indexName), + listener, + basePath + ); + assertNotNull(runnable); + runnable.run(); + + String expectedFilePrefix = String.join( + DELIMITER, + INDEX_ROUTING_FILE_PREFIX, + RemoteStoreUtils.invertLong(clusterState.term()), + RemoteStoreUtils.invertLong(clusterState.version()) + ); + verify(blobContainer, times(1)).writeBlob(startsWith(expectedFilePrefix), any(StreamInput.class), anyLong(), eq(true)); + verify(listener, times(1)).onResponse(any(ClusterMetadataManifest.UploadedMetadata.class)); + } + + public void testGetIndexRoutingAsyncActionFailureInBlobRepo() throws IOException { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + ClusterState clusterState = createClusterState(indexName); + BlobPath expectedPath = getPath(); + + LatchedActionListener listener = mock(LatchedActionListener.class); + when(blobStore.blobContainer(expectedPath)).thenReturn(blobContainer); + doThrow(new IOException("testing failure")).when(blobContainer).writeBlob(anyString(), any(StreamInput.class), anyLong(), eq(true)); + + remoteRoutingTableService.start(); + CheckedRunnable runnable = remoteRoutingTableService.getIndexRoutingAsyncAction( + clusterState, + clusterState.routingTable().getIndicesRouting().get(indexName), + listener, + basePath + ); + assertNotNull(runnable); + runnable.run(); + String expectedFilePrefix = String.join( + DELIMITER, + INDEX_ROUTING_FILE_PREFIX, + RemoteStoreUtils.invertLong(clusterState.term()), + RemoteStoreUtils.invertLong(clusterState.version()) + ); + verify(blobContainer, times(1)).writeBlob(startsWith(expectedFilePrefix), any(StreamInput.class), anyLong(), eq(true)); + verify(listener, times(1)).onFailure(any(RemoteClusterStateService.RemoteStateTransferException.class)); + } + + public void testGetIndexRoutingAsyncActionAsyncRepo() throws IOException { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + ClusterState clusterState = createClusterState(indexName); + BlobPath expectedPath = getPath(); + + LatchedActionListener listener = mock(LatchedActionListener.class); + blobContainer = mock(AsyncMultiStreamBlobContainer.class); + when(blobStore.blobContainer(expectedPath)).thenReturn(blobContainer); + ArgumentCaptor> actionListenerArgumentCaptor = ArgumentCaptor.forClass(ActionListener.class); + ArgumentCaptor writeContextArgumentCaptor = ArgumentCaptor.forClass(WriteContext.class); + ConcurrentHashMap capturedWriteContext = new ConcurrentHashMap<>(); + + doAnswer((i) -> { + actionListenerArgumentCaptor.getValue().onResponse(null); + WriteContext writeContext = writeContextArgumentCaptor.getValue(); + capturedWriteContext.put(writeContext.getFileName().split(DELIMITER)[0], writeContextArgumentCaptor.getValue()); + return null; + }).when((AsyncMultiStreamBlobContainer) blobContainer) + .asyncBlobUpload(writeContextArgumentCaptor.capture(), actionListenerArgumentCaptor.capture()); + + remoteRoutingTableService.start(); + CheckedRunnable runnable = remoteRoutingTableService.getIndexRoutingAsyncAction( + clusterState, + clusterState.routingTable().getIndicesRouting().get(indexName), + listener, + basePath + ); + assertNotNull(runnable); + runnable.run(); + + String expectedFilePrefix = String.join( + DELIMITER, + INDEX_ROUTING_FILE_PREFIX, + RemoteStoreUtils.invertLong(clusterState.term()), + RemoteStoreUtils.invertLong(clusterState.version()) + ); + assertEquals(1, actionListenerArgumentCaptor.getAllValues().size()); + assertEquals(1, writeContextArgumentCaptor.getAllValues().size()); + assertNotNull(capturedWriteContext.get("index_routing")); + assertEquals(capturedWriteContext.get("index_routing").getWritePriority(), WritePriority.URGENT); + assertTrue(capturedWriteContext.get("index_routing").getFileName().startsWith(expectedFilePrefix)); + } + + public void testGetIndexRoutingAsyncActionAsyncRepoFailureInRepo() throws IOException { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + ClusterState clusterState = createClusterState(indexName); + BlobPath expectedPath = getPath(); + + LatchedActionListener listener = mock(LatchedActionListener.class); + blobContainer = mock(AsyncMultiStreamBlobContainer.class); + when(blobStore.blobContainer(expectedPath)).thenReturn(blobContainer); + + doThrow(new IOException("Testing failure")).when((AsyncMultiStreamBlobContainer) blobContainer) + .asyncBlobUpload(any(WriteContext.class), any(ActionListener.class)); + + remoteRoutingTableService.start(); + CheckedRunnable runnable = remoteRoutingTableService.getIndexRoutingAsyncAction( + clusterState, + clusterState.routingTable().getIndicesRouting().get(indexName), + listener, + basePath + ); + assertNotNull(runnable); + runnable.run(); + verify(listener, times(1)).onFailure(any(RemoteClusterStateService.RemoteStateTransferException.class)); + } + + public void testGetAllUploadedIndicesRouting() { + final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder().build(); + final ClusterMetadataManifest.UploadedIndexMetadata uploadedIndexMetadata = new ClusterMetadataManifest.UploadedIndexMetadata( + "test-index", + "index-uuid", + "index-filename", + InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + ); + + List allIndiceRoutingMetadata = remoteRoutingTableService + .getAllUploadedIndicesRouting(previousManifest, List.of(uploadedIndexMetadata), List.of()); + assertNotNull(allIndiceRoutingMetadata); + assertEquals(1, allIndiceRoutingMetadata.size()); + assertEquals(uploadedIndexMetadata, allIndiceRoutingMetadata.get(0)); + } + + public void testGetAllUploadedIndicesRoutingExistingIndexInManifest() { + final ClusterMetadataManifest.UploadedIndexMetadata uploadedIndexMetadata = new ClusterMetadataManifest.UploadedIndexMetadata( + "test-index", + "index-uuid", + "index-filename", + InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + ); + final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() + .indicesRouting(List.of(uploadedIndexMetadata)) + .build(); + + List allIndiceRoutingMetadata = remoteRoutingTableService + .getAllUploadedIndicesRouting(previousManifest, List.of(uploadedIndexMetadata), List.of()); + assertNotNull(allIndiceRoutingMetadata); + assertEquals(1, allIndiceRoutingMetadata.size()); + assertEquals(uploadedIndexMetadata, allIndiceRoutingMetadata.get(0)); + } + + public void testGetAllUploadedIndicesRoutingNewIndexFromManifest() { + final ClusterMetadataManifest.UploadedIndexMetadata uploadedIndexMetadata = new ClusterMetadataManifest.UploadedIndexMetadata( + "test-index", + "index-uuid", + "index-filename", + InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + ); + final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() + .indicesRouting(List.of(uploadedIndexMetadata)) + .build(); + final ClusterMetadataManifest.UploadedIndexMetadata uploadedIndexMetadata2 = new ClusterMetadataManifest.UploadedIndexMetadata( + "test-index2", + "index-uuid", + "index-filename", + InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + ); + + List allIndiceRoutingMetadata = remoteRoutingTableService + .getAllUploadedIndicesRouting(previousManifest, List.of(uploadedIndexMetadata2), List.of()); + assertNotNull(allIndiceRoutingMetadata); + assertEquals(2, allIndiceRoutingMetadata.size()); + assertEquals(uploadedIndexMetadata, allIndiceRoutingMetadata.get(0)); + assertEquals(uploadedIndexMetadata2, allIndiceRoutingMetadata.get(1)); + } + + public void testGetAllUploadedIndicesRoutingIndexDeleted() { + final ClusterMetadataManifest.UploadedIndexMetadata uploadedIndexMetadata = new ClusterMetadataManifest.UploadedIndexMetadata( + "test-index", + "index-uuid", + "index-filename", + InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + ); + final ClusterMetadataManifest.UploadedIndexMetadata uploadedIndexMetadata2 = new ClusterMetadataManifest.UploadedIndexMetadata( + "test-index2", + "index-uuid", + "index-filename", + InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + ); + final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() + .indicesRouting(List.of(uploadedIndexMetadata, uploadedIndexMetadata2)) + .build(); + + List allIndiceRoutingMetadata = remoteRoutingTableService + .getAllUploadedIndicesRouting(previousManifest, List.of(uploadedIndexMetadata2), List.of("test-index")); + assertNotNull(allIndiceRoutingMetadata); + assertEquals(1, allIndiceRoutingMetadata.size()); + assertEquals(uploadedIndexMetadata2, allIndiceRoutingMetadata.get(0)); + } + + public void testGetAllUploadedIndicesRoutingNoChange() { + final ClusterMetadataManifest.UploadedIndexMetadata uploadedIndexMetadata = new ClusterMetadataManifest.UploadedIndexMetadata( + "test-index", + "index-uuid", + "index-filename", + InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + ); + final ClusterMetadataManifest.UploadedIndexMetadata uploadedIndexMetadata2 = new ClusterMetadataManifest.UploadedIndexMetadata( + "test-index2", + "index-uuid", + "index-filename", + InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + ); + final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() + .indicesRouting(List.of(uploadedIndexMetadata, uploadedIndexMetadata2)) + .build(); + + List allIndiceRoutingMetadata = remoteRoutingTableService + .getAllUploadedIndicesRouting(previousManifest, List.of(), List.of()); + assertNotNull(allIndiceRoutingMetadata); + assertEquals(2, allIndiceRoutingMetadata.size()); + assertEquals(uploadedIndexMetadata, allIndiceRoutingMetadata.get(0)); + assertEquals(uploadedIndexMetadata2, allIndiceRoutingMetadata.get(1)); + } + + private ClusterState createClusterState(String indexName) { + final IndexMetadata indexMetadata = new IndexMetadata.Builder(indexName).settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, "uuid") + .build() + ).numberOfShards(randomInt(1000)).numberOfReplicas(randomInt(10)).build(); + RoutingTable routingTable = RoutingTable.builder().addAsNew(indexMetadata).build(); + return ClusterState.builder(ClusterName.DEFAULT) + .routingTable(routingTable) + .metadata(Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(1L).build())) + .version(2L) + .build(); + } + + private BlobPath getPath() { + BlobPath indexRoutingPath = basePath.add(INDEX_ROUTING_PATH_TOKEN); + return RemoteStoreEnums.PathType.HASHED_PREFIX.path( + RemoteStorePathStrategy.BasePathInput.builder().basePath(indexRoutingPath).indexUUID("uuid").build(), + RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64 + ); + } } diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java index d1f559eb75f85..7cdadfe9e96c3 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java @@ -12,6 +12,7 @@ import org.opensearch.cluster.metadata.IndexGraveyard; import org.opensearch.cluster.metadata.RepositoriesMetadata; import org.opensearch.cluster.metadata.WeightedRoutingMetadata; +import org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService; import org.opensearch.common.xcontent.json.JsonXContent; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; @@ -366,6 +367,13 @@ public void testClusterMetadataManifestXContentV2() throws IOException { public void testClusterMetadataManifestXContentV3() throws IOException { UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path"); UploadedMetadataAttribute uploadedMetadataAttribute = new UploadedMetadataAttribute("attribute_name", "testing_attribute"); + UploadedIndexMetadata uploadedIndexRoutingMetadata = new UploadedIndexMetadata( + "test-index", + "test-uuid", + "routing-path", + InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + ); + ClusterMetadataManifest originalManifest = new ClusterMetadataManifest( 1L, 1L, @@ -400,7 +408,7 @@ public void testClusterMetadataManifestXContentV3() throws IOException { ) ).stream().collect(Collectors.toMap(UploadedMetadataAttribute::getAttributeName, Function.identity())), 1L, - Collections.singletonList(uploadedIndexMetadata) + Collections.singletonList(uploadedIndexRoutingMetadata) ); final XContentBuilder builder = JsonXContent.contentBuilder(); builder.startObject(); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 890a4b478b502..e91eeb82d44b9 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -19,6 +19,9 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.TemplatesMetadata; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService; +import org.opensearch.cluster.routing.remote.NoopRemoteRoutingTableService; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobContainer; @@ -162,6 +165,8 @@ public void setup() { blobStore = mock(BlobStore.class); when(blobStoreRepository.blobStore()).thenReturn(blobStore); when(repositoriesService.repository("remote_store_repository")).thenReturn(blobStoreRepository); + when(repositoriesService.repository("routing_repository")).thenReturn(blobStoreRepository); + when(blobStoreRepository.getNamedXContentRegistry()).thenReturn(xContentRegistry); remoteClusterStateService = new RemoteClusterStateService( "test-node-id", @@ -1402,7 +1407,7 @@ public void testGlobalMetadataUploadWaitTimeSetting() { } public void testRemoteRoutingTableNotInitializedWhenDisabled() { - assertFalse(remoteClusterStateService.getRemoteRoutingTableService().isPresent()); + assertTrue(remoteClusterStateService.getRemoteRoutingTableService() instanceof NoopRemoteRoutingTableService); } public void testRemoteRoutingTableInitializedWhenEnabled() { @@ -1425,7 +1430,172 @@ public void testRemoteRoutingTableInitializedWhenEnabled() { threadPool, List.of(new RemoteIndexPathUploader(threadPool, newSettings, repositoriesServiceSupplier, clusterSettings)) ); - assertTrue(remoteClusterStateService.getRemoteRoutingTableService().isPresent()); + assertTrue(remoteClusterStateService.getRemoteRoutingTableService() instanceof InternalRemoteRoutingTableService); + } + + public void testWriteFullMetadataSuccessWithRoutingTable() throws IOException { + initializeRoutingTable(); + mockBlobStoreObjects(); + when((blobStoreRepository.basePath())).thenReturn(BlobPath.cleanPath().add("base-path")); + + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + remoteClusterStateService.start(); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState, "prev-cluster-uuid") + .getClusterMetadataManifest(); + final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); + final UploadedIndexMetadata uploadedIndiceRoutingMetadata = new UploadedIndexMetadata( + "test-index", + "index-uuid", + "routing-filename", + InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + ); + final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() + .indices(List.of(uploadedIndexMetadata)) + .clusterTerm(1L) + .stateVersion(1L) + .stateUUID("state-uuid") + .clusterUUID("cluster-uuid") + .previousClusterUUID("prev-cluster-uuid") + .routingTableVersion(1L) + .indicesRouting(List.of(uploadedIndiceRoutingMetadata)) + .build(); + + assertThat(manifest.getIndices().size(), is(1)); + assertThat(manifest.getClusterTerm(), is(expectedManifest.getClusterTerm())); + assertThat(manifest.getStateVersion(), is(expectedManifest.getStateVersion())); + assertThat(manifest.getClusterUUID(), is(expectedManifest.getClusterUUID())); + assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); + assertThat(manifest.getPreviousClusterUUID(), is(expectedManifest.getPreviousClusterUUID())); + assertThat(manifest.getRoutingTableVersion(), is(expectedManifest.getRoutingTableVersion())); + assertThat(manifest.getIndicesRouting().get(0).getIndexName(), is(uploadedIndiceRoutingMetadata.getIndexName())); + assertThat(manifest.getIndicesRouting().get(0).getIndexUUID(), is(uploadedIndiceRoutingMetadata.getIndexUUID())); + assertThat(manifest.getIndicesRouting().get(0).getUploadedFilename(), notNullValue()); + } + + public void testWriteFullMetadataInParallelSuccessWithRoutingTable() throws IOException { + initializeRoutingTable(); + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + AsyncMultiStreamBlobContainer container = (AsyncMultiStreamBlobContainer) mockBlobStoreObjects(AsyncMultiStreamBlobContainer.class); + + ArgumentCaptor> actionListenerArgumentCaptor = ArgumentCaptor.forClass(ActionListener.class); + ArgumentCaptor writeContextArgumentCaptor = ArgumentCaptor.forClass(WriteContext.class); + ConcurrentHashMap capturedWriteContext = new ConcurrentHashMap<>(); + doAnswer((i) -> { + actionListenerArgumentCaptor.getValue().onResponse(null); + WriteContext writeContext = writeContextArgumentCaptor.getValue(); + capturedWriteContext.put(writeContext.getFileName().split(DELIMITER)[0], writeContextArgumentCaptor.getValue()); + return null; + }).when(container).asyncBlobUpload(writeContextArgumentCaptor.capture(), actionListenerArgumentCaptor.capture()); + + when((blobStoreRepository.basePath())).thenReturn(BlobPath.cleanPath().add("base-path")); + + remoteClusterStateService.start(); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState, "prev-cluster-uuid") + .getClusterMetadataManifest(); + + final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); + final UploadedIndexMetadata uploadedIndiceRoutingMetadata = new UploadedIndexMetadata( + "test-index", + "index-uuid", + "routing-filename", + InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + ); + + final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() + .indices(List.of(uploadedIndexMetadata)) + .clusterTerm(1L) + .stateVersion(1L) + .stateUUID("state-uuid") + .clusterUUID("cluster-uuid") + .previousClusterUUID("prev-cluster-uuid") + .routingTableVersion(1) + .indicesRouting(List.of(uploadedIndiceRoutingMetadata)) + .build(); + + assertThat(manifest.getIndices().size(), is(1)); + assertThat(manifest.getIndices().get(0).getIndexName(), is(uploadedIndexMetadata.getIndexName())); + assertThat(manifest.getIndices().get(0).getIndexUUID(), is(uploadedIndexMetadata.getIndexUUID())); + assertThat(manifest.getIndices().get(0).getUploadedFilename(), notNullValue()); + assertThat(manifest.getClusterTerm(), is(expectedManifest.getClusterTerm())); + assertThat(manifest.getStateVersion(), is(expectedManifest.getStateVersion())); + assertThat(manifest.getClusterUUID(), is(expectedManifest.getClusterUUID())); + assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); + assertThat(manifest.getPreviousClusterUUID(), is(expectedManifest.getPreviousClusterUUID())); + assertThat(manifest.getRoutingTableVersion(), is(expectedManifest.getRoutingTableVersion())); + assertThat(manifest.getIndicesRouting().get(0).getIndexName(), is(uploadedIndiceRoutingMetadata.getIndexName())); + assertThat(manifest.getIndicesRouting().get(0).getIndexUUID(), is(uploadedIndiceRoutingMetadata.getIndexUUID())); + assertThat(manifest.getIndicesRouting().get(0).getUploadedFilename(), notNullValue()); + + assertEquals(8, actionListenerArgumentCaptor.getAllValues().size()); + assertEquals(8, writeContextArgumentCaptor.getAllValues().size()); + } + + public void testWriteIncrementalMetadataSuccessWithRoutingTable() throws IOException { + initializeRoutingTable(); + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + mockBlobStoreObjects(); + final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build(); + final ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata(Metadata.builder().coordinationMetadata(coordinationMetadata)) + .build(); + + final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder().indices(Collections.emptyList()).build(); + when((blobStoreRepository.basePath())).thenReturn(BlobPath.cleanPath().add("base-path")); + + remoteClusterStateService.start(); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeIncrementalMetadata( + previousClusterState, + clusterState, + previousManifest + ).getClusterMetadataManifest(); + final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); + final UploadedIndexMetadata uploadedIndiceRoutingMetadata = new UploadedIndexMetadata( + "test-index", + "index-uuid", + "routing-filename", + InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + ); + final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() + .indices(List.of(uploadedIndexMetadata)) + .clusterTerm(1L) + .stateVersion(1L) + .stateUUID("state-uuid") + .clusterUUID("cluster-uuid") + .previousClusterUUID("prev-cluster-uuid") + .routingTableVersion(1) + .indicesRouting(List.of(uploadedIndiceRoutingMetadata)) + .build(); + + assertThat(manifest.getIndices().size(), is(1)); + assertThat(manifest.getClusterTerm(), is(expectedManifest.getClusterTerm())); + assertThat(manifest.getStateVersion(), is(expectedManifest.getStateVersion())); + assertThat(manifest.getClusterUUID(), is(expectedManifest.getClusterUUID())); + assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); + assertThat(manifest.getRoutingTableVersion(), is(expectedManifest.getRoutingTableVersion())); + assertThat(manifest.getIndicesRouting().get(0).getIndexName(), is(uploadedIndiceRoutingMetadata.getIndexName())); + assertThat(manifest.getIndicesRouting().get(0).getIndexUUID(), is(uploadedIndiceRoutingMetadata.getIndexUUID())); + assertThat(manifest.getIndicesRouting().get(0).getUploadedFilename(), notNullValue()); + } + + private void initializeRoutingTable() { + Settings newSettings = Settings.builder() + .put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, "routing_repository") + .put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, "remote_store_repository") + .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) + .build(); + clusterSettings.applySettings(newSettings); + + Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, "true").build(); + FeatureFlags.initializeFeatureFlags(nodeSettings); + remoteClusterStateService = new RemoteClusterStateService( + "test-node-id", + repositoriesServiceSupplier, + newSettings, + clusterService, + () -> 0L, + threadPool, + List.of(new RemoteIndexPathUploader(threadPool, newSettings, repositoriesServiceSupplier, clusterSettings)) + ); } private void mockObjectsForGettingPreviousClusterUUID(Map clusterUUIDsPointers) throws IOException { @@ -1850,7 +2020,8 @@ static ClusterState.Builder generateClusterStateWithOneIndex() { .templates(templatesMetadata) .putCustom(customMetadata1.getWriteableName(), customMetadata1) .build() - ); + ) + .routingTable(RoutingTable.builder().addAsNew(indexMetadata).version(1L).build()); } static DiscoveryNodes nodesWithLocalNodeClusterManager() { diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteStoreEnumsTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteStoreEnumsTests.java index 575b397382f24..c3f52f3976414 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteStoreEnumsTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteStoreEnumsTests.java @@ -92,17 +92,6 @@ public void testGeneratePathForFixedType() { result = FIXED.path(pathInput, null); assertEquals(basePath + dataCategory.getName() + SEPARATOR + dataType.getName() + SEPARATOR, result.buildAsString()); - // Translog Lock files - This is a negative case where the assertion will trip. - dataType = LOCK_FILES; - PathInput finalPathInput = PathInput.builder() - .basePath(blobPath) - .indexUUID(indexUUID) - .shardId(shardId) - .dataCategory(dataCategory) - .dataType(dataType) - .build(); - assertThrows(AssertionError.class, () -> FIXED.path(finalPathInput, null)); - // Segment Data dataCategory = SEGMENTS; dataType = DATA; @@ -208,27 +197,6 @@ public void testGeneratePathForHashedPrefixType() { result = HASHED_PREFIX.path(pathInput, FNV_1A_BASE64); assertEquals("oKU5SjILiy4/xjsdhj/ddjsha/yudy7sd/32hdhua7/89jdij/k2ijhe877d7yuhx7/10/translog/metadata/", result.buildAsString()); - // Translog Lock files - This is a negative case where the assertion will trip. - dataType = LOCK_FILES; - PathInput finalPathInput = PathInput.builder() - .basePath(blobPath) - .indexUUID(indexUUID) - .shardId(shardId) - .dataCategory(dataCategory) - .dataType(dataType) - .build(); - assertThrows(AssertionError.class, () -> HASHED_PREFIX.path(finalPathInput, null)); - - // assert with exact value for known base path - pathInput = PathInput.builder() - .basePath(fixedBlobPath) - .indexUUID(fixedIndexUUID) - .shardId(fixedShardId) - .dataCategory(dataCategory) - .dataType(dataType) - .build(); - assertThrows(AssertionError.class, () -> HASHED_PREFIX.path(finalPathInput, null)); - // Segment Data dataCategory = SEGMENTS; dataType = DATA; @@ -383,27 +351,6 @@ public void testGeneratePathForHashedPrefixTypeAndFNVCompositeHashAlgorithm() { result.buildAsString() ); - // Translog Lock files - This is a negative case where the assertion will trip. - dataType = LOCK_FILES; - PathInput finalPathInput = PathInput.builder() - .basePath(blobPath) - .indexUUID(indexUUID) - .shardId(shardId) - .dataCategory(dataCategory) - .dataType(dataType) - .build(); - assertThrows(AssertionError.class, () -> HASHED_PREFIX.path(finalPathInput, null)); - - // assert with exact value for known base path - pathInput = PathInput.builder() - .basePath(fixedBlobPath) - .indexUUID(fixedIndexUUID) - .shardId(fixedShardId) - .dataCategory(dataCategory) - .dataType(dataType) - .build(); - assertThrows(AssertionError.class, () -> HASHED_PREFIX.path(finalPathInput, null)); - // Segment Data dataCategory = SEGMENTS; dataType = DATA; @@ -509,7 +456,7 @@ public void testGeneratePathForHashedInfixType() { DataType dataType = DATA; String basePath = getPath(pathList); - basePath = basePath.length() == 0 ? basePath : basePath.substring(0, basePath.length() - 1); + basePath = basePath.isEmpty() ? basePath : basePath.substring(0, basePath.length() - 1); // Translog Data PathInput pathInput = PathInput.builder() .basePath(blobPath) @@ -567,27 +514,6 @@ public void testGeneratePathForHashedInfixType() { actual = result.buildAsString(); assertTrue(new ParameterizedMessage("expected={} actual={}", expected, actual).getFormattedMessage(), actual.startsWith(expected)); - // Translog Lock files - This is a negative case where the assertion will trip. - dataType = LOCK_FILES; - PathInput finalPathInput = PathInput.builder() - .basePath(blobPath) - .indexUUID(indexUUID) - .shardId(shardId) - .dataCategory(dataCategory) - .dataType(dataType) - .build(); - assertThrows(AssertionError.class, () -> HASHED_INFIX.path(finalPathInput, null)); - - // assert with exact value for known base path - pathInput = PathInput.builder() - .basePath(fixedBlobPath) - .indexUUID(fixedIndexUUID) - .shardId(fixedShardId) - .dataCategory(dataCategory) - .dataType(dataType) - .build(); - assertThrows(AssertionError.class, () -> HASHED_INFIX.path(finalPathInput, null)); - // Segment Data dataCategory = SEGMENTS; dataType = DATA; diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteStorePathStrategyTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteStorePathStrategyTests.java new file mode 100644 index 0000000000000..217ffe804573e --- /dev/null +++ b/server/src/test/java/org/opensearch/index/remote/RemoteStorePathStrategyTests.java @@ -0,0 +1,87 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.remote; + +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.test.OpenSearchTestCase; + +import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG; +import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA; +import static org.opensearch.index.remote.RemoteStoreEnums.DataType.LOCK_FILES; + +public class RemoteStorePathStrategyTests extends OpenSearchTestCase { + + private static final BlobPath BASE_PATH = BlobPath.cleanPath().add("base-path"); + private static final String INDEX_UUID = "indexUUID"; + private static final String SHARD_ID = "shardId"; + + public void testBasePathInput() { + assertThrows(NullPointerException.class, () -> RemoteStorePathStrategy.BasePathInput.builder().build()); + assertThrows(NullPointerException.class, () -> RemoteStorePathStrategy.BasePathInput.builder().basePath(BASE_PATH).build()); + assertThrows(NullPointerException.class, () -> RemoteStorePathStrategy.BasePathInput.builder().indexUUID(INDEX_UUID).build()); + RemoteStorePathStrategy.BasePathInput input = RemoteStorePathStrategy.BasePathInput.builder() + .basePath(BASE_PATH) + .indexUUID(INDEX_UUID) + .build(); + assertEquals(BASE_PATH, input.basePath()); + assertEquals(INDEX_UUID, input.indexUUID()); + } + + public void testPathInput() { + assertThrows(NullPointerException.class, () -> RemoteStorePathStrategy.PathInput.builder().build()); + assertThrows(NullPointerException.class, () -> RemoteStorePathStrategy.PathInput.builder().shardId(SHARD_ID).build()); + assertThrows( + NullPointerException.class, + () -> RemoteStorePathStrategy.PathInput.builder().shardId(SHARD_ID).dataCategory(TRANSLOG).build() + ); + + // Translog Lock files - This is a negative case where the assertion will trip. + assertThrows( + AssertionError.class, + () -> RemoteStorePathStrategy.PathInput.builder() + .basePath(BASE_PATH) + .indexUUID(INDEX_UUID) + .shardId(SHARD_ID) + .dataCategory(TRANSLOG) + .dataType(LOCK_FILES) + .build() + ); + + RemoteStorePathStrategy.PathInput input = RemoteStorePathStrategy.PathInput.builder() + .basePath(BASE_PATH) + .indexUUID(INDEX_UUID) + .shardId(SHARD_ID) + .dataCategory(TRANSLOG) + .dataType(DATA) + .build(); + assertEquals(BASE_PATH, input.basePath()); + assertEquals(INDEX_UUID, input.indexUUID()); + assertEquals(SHARD_ID, input.shardId()); + assertEquals(DATA, input.dataType()); + assertEquals(TRANSLOG, input.dataCategory()); + } + + public void testFixedSubPath() { + RemoteStorePathStrategy.BasePathInput input = RemoteStorePathStrategy.BasePathInput.builder() + .basePath(BASE_PATH) + .indexUUID(INDEX_UUID) + .build(); + assertEquals(BlobPath.cleanPath().add(INDEX_UUID), input.fixedSubPath()); + + RemoteStorePathStrategy.PathInput input2 = RemoteStorePathStrategy.PathInput.builder() + .basePath(BASE_PATH) + .indexUUID(INDEX_UUID) + .shardId(SHARD_ID) + .dataCategory(TRANSLOG) + .dataType(DATA) + .build(); + assertEquals(BlobPath.cleanPath().add(INDEX_UUID).add(SHARD_ID).add(TRANSLOG.getName()).add(DATA.getName()), input2.fixedSubPath()); + + } +}