From 1a2c70b66b32dab2460b87c1134b8e01a2af6122 Mon Sep 17 00:00:00 2001 From: Kartik Ganesh Date: Wed, 1 Jun 2022 14:46:36 -0700 Subject: [PATCH] Incorporating PR comments Signed-off-by: Kartik Ganesh --- .../index/engine/InternalEngine.java | 2 - .../SegmentReplicationSourceService.java | 15 +++-- .../index/engine/InternalEngineTests.java | 56 ------------------- 3 files changed, 11 insertions(+), 62 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 5b28b29914b4e..d65c06cce1b69 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -2307,8 +2307,6 @@ public SegmentInfos getLatestSegmentInfos() { @Override public GatedCloseable getSegmentInfosSnapshot() { - // this should never be called by read-only engines - assert (engineConfig.isReadOnlyReplica() == false); final SegmentInfos segmentInfos = getLatestSegmentInfos(); try { indexWriter.incRefDeleter(segmentInfos); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java index 4ca15352a065b..ebcf2d27e6933 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -121,8 +121,15 @@ private synchronized CopyState getCachedCopyState(ReplicationCheckpoint checkpoi final IndexShard indexShard = indexService.getShard(shardId.id()); // build the CopyState object and cache it before returning final CopyState copyState = new CopyState(indexShard); - // TODO This will add with the latest checkpoint, not the one from the request - addToCopyStateMap(copyState); + + /** + * Use the checkpoint from the request as the key in the map, rather than + * the checkpoint from the created CopyState. This maximizes cache hits + * if replication targets make a request with an older checkpoint. + * Replication targets are expected to fetch the checkpoint in the response + * CopyState to bring themselves up to date. + */ + addToCopyStateMap(checkpoint, copyState); return copyState; } } @@ -131,8 +138,8 @@ private synchronized CopyState getCachedCopyState(ReplicationCheckpoint checkpoi * Adds the input {@link CopyState} object to {@link #copyStateMap}. * The key is the CopyState's {@link ReplicationCheckpoint} object. */ - private void addToCopyStateMap(CopyState copyState) { - copyStateMap.putIfAbsent(copyState.getCheckpoint(), copyState); + private void addToCopyStateMap(ReplicationCheckpoint checkpoint, CopyState copyState) { + copyStateMap.putIfAbsent(checkpoint, copyState); } /** diff --git a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java index eb92abf18e74e..b14ad15070118 100644 --- a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java @@ -147,7 +147,6 @@ import org.opensearch.index.translog.TranslogConfig; import org.opensearch.index.translog.TranslogDeletionPolicyFactory; import org.opensearch.indices.breaker.NoneCircuitBreakerService; -import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.IndexSettingsModule; import org.opensearch.test.VersionUtils; import org.opensearch.threadpool.ThreadPool; @@ -7388,61 +7387,6 @@ public void testMaxDocsOnReplica() throws Exception { } } - public void testGetSegmentInfosSnapshot_OnReadReplica() throws IOException { - engine.close(); - Store store = createStore(); - // create an engine just so we can easily fetch the engine config constructor parameters - InternalEngine tempEngine = createEngine(store, createTempDir()); - EngineConfig tempConfig = tempEngine.config(); - // read-only engine config requires the replication type setting to be SEGMENT - final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings( - "test", - Settings.builder() - .put(defaultSettings.getSettings()) - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .build() - ); - // create the read-only engine config - EngineConfig readOnlyEngineConfig = new EngineConfig( - tempConfig.getShardId(), - tempConfig.getThreadPool(), - indexSettings, - tempConfig.getWarmer(), - store, - tempConfig.getMergePolicy(), - tempConfig.getAnalyzer(), - tempConfig.getSimilarity(), - new CodecService(null, logger), - tempConfig.getEventListener(), - tempConfig.getQueryCache(), - tempConfig.getQueryCachingPolicy(), - tempConfig.getTranslogConfig(), - null, - tempConfig.getFlushMergesAfter(), - tempConfig.getExternalRefreshListener(), - tempConfig.getInternalRefreshListener(), - tempConfig.getIndexSort(), - tempConfig.getCircuitBreakerService(), - tempConfig.getGlobalCheckpointSupplier(), - tempConfig.retentionLeasesSupplier(), - tempConfig.getPrimaryTermSupplier(), - tempConfig.getTombstoneDocSupplier(), - true - ); - // close engine now that it is no longer needed - tempEngine.close(); - - SetOnce indexWriterHolder = new SetOnce<>(); - IndexWriterFactory indexWriterFactory = (directory, iwc) -> { - indexWriterHolder.set(new IndexWriter(directory, iwc)); - return indexWriterHolder.get(); - }; - InternalEngine engine = createEngine(readOnlyEngineConfig); - expectThrows(AssertionError.class, engine::getSegmentInfosSnapshot); - engine.close(); - store.close(); - } - public void testGetSegmentInfosSnapshot() throws IOException { IOUtils.close(store, engine); Store store = createStore();