Skip to content

Commit

Permalink
Incorporating PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Kartik Ganesh <gkart@amazon.com>
  • Loading branch information
kartg committed Jun 1, 2022
1 parent 6ecb560 commit 1a2c70b
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2307,8 +2307,6 @@ public SegmentInfos getLatestSegmentInfos() {

@Override
public GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
// this should never be called by read-only engines
assert (engineConfig.isReadOnlyReplica() == false);
final SegmentInfos segmentInfos = getLatestSegmentInfos();
try {
indexWriter.incRefDeleter(segmentInfos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,15 @@ private synchronized CopyState getCachedCopyState(ReplicationCheckpoint checkpoi
final IndexShard indexShard = indexService.getShard(shardId.id());
// build the CopyState object and cache it before returning
final CopyState copyState = new CopyState(indexShard);
// TODO This will add with the latest checkpoint, not the one from the request
addToCopyStateMap(copyState);

/**
* Use the checkpoint from the request as the key in the map, rather than
* the checkpoint from the created CopyState. This maximizes cache hits
* if replication targets make a request with an older checkpoint.
* Replication targets are expected to fetch the checkpoint in the response
* CopyState to bring themselves up to date.
*/
addToCopyStateMap(checkpoint, copyState);
return copyState;
}
}
Expand All @@ -131,8 +138,8 @@ private synchronized CopyState getCachedCopyState(ReplicationCheckpoint checkpoi
* Adds the input {@link CopyState} object to {@link #copyStateMap}.
* The key is the CopyState's {@link ReplicationCheckpoint} object.
*/
private void addToCopyStateMap(CopyState copyState) {
copyStateMap.putIfAbsent(copyState.getCheckpoint(), copyState);
private void addToCopyStateMap(ReplicationCheckpoint checkpoint, CopyState copyState) {
copyStateMap.putIfAbsent(checkpoint, copyState);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogDeletionPolicyFactory;
import org.opensearch.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.IndexSettingsModule;
import org.opensearch.test.VersionUtils;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -7388,61 +7387,6 @@ public void testMaxDocsOnReplica() throws Exception {
}
}

public void testGetSegmentInfosSnapshot_OnReadReplica() throws IOException {
engine.close();
Store store = createStore();
// create an engine just so we can easily fetch the engine config constructor parameters
InternalEngine tempEngine = createEngine(store, createTempDir());
EngineConfig tempConfig = tempEngine.config();
// read-only engine config requires the replication type setting to be SEGMENT
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(
"test",
Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build()
);
// create the read-only engine config
EngineConfig readOnlyEngineConfig = new EngineConfig(
tempConfig.getShardId(),
tempConfig.getThreadPool(),
indexSettings,
tempConfig.getWarmer(),
store,
tempConfig.getMergePolicy(),
tempConfig.getAnalyzer(),
tempConfig.getSimilarity(),
new CodecService(null, logger),
tempConfig.getEventListener(),
tempConfig.getQueryCache(),
tempConfig.getQueryCachingPolicy(),
tempConfig.getTranslogConfig(),
null,
tempConfig.getFlushMergesAfter(),
tempConfig.getExternalRefreshListener(),
tempConfig.getInternalRefreshListener(),
tempConfig.getIndexSort(),
tempConfig.getCircuitBreakerService(),
tempConfig.getGlobalCheckpointSupplier(),
tempConfig.retentionLeasesSupplier(),
tempConfig.getPrimaryTermSupplier(),
tempConfig.getTombstoneDocSupplier(),
true
);
// close engine now that it is no longer needed
tempEngine.close();

SetOnce<IndexWriter> indexWriterHolder = new SetOnce<>();
IndexWriterFactory indexWriterFactory = (directory, iwc) -> {
indexWriterHolder.set(new IndexWriter(directory, iwc));
return indexWriterHolder.get();
};
InternalEngine engine = createEngine(readOnlyEngineConfig);
expectThrows(AssertionError.class, engine::getSegmentInfosSnapshot);
engine.close();
store.close();
}

public void testGetSegmentInfosSnapshot() throws IOException {
IOUtils.close(store, engine);
Store store = createStore();
Expand Down

0 comments on commit 1a2c70b

Please sign in to comment.