From efbd59a718f8cfd289ead6c355fa95aebe610d1c Mon Sep 17 00:00:00 2001 From: Ankit Kala Date: Thu, 8 Sep 2022 12:54:11 +0530 Subject: [PATCH] Addressing PR comments Signed-off-by: Ankit Kala --- CHANGELOG.md | 1 + .../opensearch/index/shard/IndexShard.java | 5 +- .../index/shard/IndexShardTests.java | 60 +++++++++++++++++++ 3 files changed, 64 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a10824a56af05..8f6a3d4671b14 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Plugin ZIP publication groupId value is configurable ([#4156](https://github.com/opensearch-project/OpenSearch/pull/4156)) - Add index specific setting for remote repository ([#4253](https://github.com/opensearch-project/OpenSearch/pull/4253)) - [Segment Replication] Update replicas to commit SegmentInfos instead of relying on SIS files from primary shards. ([#4402](https://github.com/opensearch-project/OpenSearch/pull/4402)) +- [CCR] Add getHistoryOperationsFromTranslog method to fetch the history snapshot from translogs ([#3948](https://github.com/opensearch-project/OpenSearch/pull/3948)) ### Deprecated diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 2277930042a1c..d2ab97f24d09c 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -2360,10 +2360,11 @@ public Translog.Snapshot getHistoryOperations(String reason, long startingSeqNo, /** * Creates a new history snapshot from the translog instead of the lucene index. Required for cross cluster replication. * Use the recommended {@link #getHistoryOperations(String, long, long, boolean)} method for other cases. - * Depending on how translog durability is configured, this method might/might not return the snapshot. For e.g, - * If the translog has been configured with no-durability, it'll return UnsupportedOperationException. + * This method should only be invoked if Segment Replication or Remote Store is not enabled. */ public Translog.Snapshot getHistoryOperationsFromTranslog(long startingSeqNo, long endSeqNo) throws IOException { + assert !(indexSettings.isSegRepEnabled() || indexSettings.isRemoteStoreEnabled()) + : "unsupported operation for segment replication enabled indices or remote store backed indices"; return getEngine().translogManager().newChangesSnapshot(startingSeqNo, endSeqNo, true); } diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 662afa80f65fc..27c0437236f63 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -1196,6 +1196,66 @@ public void testAcquireReplicaPermitAdvanceMaxSeqNoOfUpdates() throws Exception closeShards(replica); } + public void testGetChangesSnapshotThrowsAssertForSegRep() throws IOException { + final ShardId shardId = new ShardId("index", "_na_", 0); + final ShardRouting shardRouting = TestShardRouting.newShardRouting( + shardId, + randomAlphaOfLength(8), + true, + ShardRoutingState.INITIALIZING, + RecoverySource.EmptyStoreRecoverySource.INSTANCE + ); + final Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT.toString()) + .build(); + final IndexMetadata.Builder indexMetadata = IndexMetadata.builder(shardRouting.getIndexName()).settings(settings).primaryTerm(0, 1); + final AtomicBoolean synced = new AtomicBoolean(); + final IndexShard primaryShard = newShard( + shardRouting, + indexMetadata.build(), + null, + new InternalEngineFactory(), + () -> synced.set(true), + RetentionLeaseSyncer.EMPTY, + null + ); + expectThrows(AssertionError.class, () -> primaryShard.getHistoryOperationsFromTranslog(0, 1)); + closeShard(primaryShard, false); + } + + public void testGetChangesSnapshotThrowsAssertForRemoteStore() throws IOException { + final ShardId shardId = new ShardId("index", "_na_", 0); + final ShardRouting shardRouting = TestShardRouting.newShardRouting( + shardId, + randomAlphaOfLength(8), + true, + ShardRoutingState.INITIALIZING, + RecoverySource.EmptyStoreRecoverySource.INSTANCE + ); + final Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .build(); + final IndexMetadata.Builder indexMetadata = IndexMetadata.builder(shardRouting.getIndexName()).settings(settings).primaryTerm(0, 1); + final AtomicBoolean synced = new AtomicBoolean(); + final IndexShard primaryShard = newShard( + shardRouting, + indexMetadata.build(), + null, + new InternalEngineFactory(), + () -> synced.set(true), + RetentionLeaseSyncer.EMPTY, + null + ); + expectThrows(AssertionError.class, () -> primaryShard.getHistoryOperationsFromTranslog(0, 1)); + closeShard(primaryShard, false); + } + public void testGlobalCheckpointSync() throws IOException { // create the primary shard with a callback that sets a boolean when the global checkpoint sync is invoked final ShardId shardId = new ShardId("index", "_na_", 0);