From f5156c9102f089c2025ff0e99ec60eee4494e29e Mon Sep 17 00:00:00 2001 From: Rishikesh Pasham <62345295+Rishikesh1159@users.noreply.github.com> Date: Mon, 15 Aug 2022 17:43:10 +0000 Subject: [PATCH] [Segment Replication] Adding PrimaryMode check before publishing checkpoint and processing a received checkpoint. (#4157) * Adding PrimaryMode check before publishing checkpoint. Signed-off-by: Rishikesh1159 * Applying spotless check Signed-off-by: Rishikesh1159 * Moving segrep specific tests to SegmentReplicationIndexShardTests. Signed-off-by: Rishikesh1159 * Adding logic and tests for rejecting checkpoints if shard is in PrimaryMode. Signed-off-by: Rishikesh1159 * Applying ./gradlew :server:spotlessApply. Signed-off-by: Rishikesh1159 * Applying ./gradlew :server:spotlessApply Signed-off-by: Rishikesh1159 * Changing log level to warn in shouldProcessCheckpoint() of IndexShard.java class. Signed-off-by: Rishikesh1159 * Removing unnecessary lazy logging in shouldProcessCheckpoint(). Signed-off-by: Rishikesh1159 Signed-off-by: Rishikesh1159 --- .../shard/CheckpointRefreshListener.java | 2 +- .../opensearch/index/shard/IndexShard.java | 4 ++ .../index/shard/IndexShardTests.java | 2 +- .../SegmentReplicationIndexShardTests.java | 43 +++++++++++++++++++ .../SegmentReplicationTargetServiceTests.java | 17 ++++++++ 5 files changed, 66 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java index ac6754bf6a74a..96d74bea85920 100644 --- a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java @@ -40,7 +40,7 @@ public void beforeRefresh() throws IOException { @Override public void afterRefresh(boolean didRefresh) throws IOException { - if (didRefresh) { + if (didRefresh && shard.getReplicationTracker().isPrimaryMode()) { publisher.publish(shard); } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 5b7c204eecdb5..f3ad41d56687b 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1420,6 +1420,10 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp logger.trace(() -> new ParameterizedMessage("Ignoring new replication checkpoint - shard is not started {}", state())); return false; } + if (getReplicationTracker().isPrimaryMode()) { + logger.warn("Ignoring new replication checkpoint - shard is in primaryMode and cannot receive any checkpoints."); + return false; + } ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint(); if (localCheckpoint.isAheadOf(requestCheckpoint)) { logger.trace( diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 49d0c089f072b..a3a49e9e30564 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -3453,7 +3453,7 @@ public void testCheckpointRefreshListenerWithNull() throws IOException { } /** - * creates a new initializing shard. The shard will will be put in its proper path under the + * creates a new initializing shard. The shard will be put in its proper path under the * current node id the shard is assigned to. * @param checkpointPublisher Segment Replication Checkpoint Publisher to publish checkpoint */ diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 4f2784db93df2..d10f8ced963b7 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -14,11 +14,17 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationType; import java.io.IOException; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; + public class SegmentReplicationIndexShardTests extends OpenSearchIndexLevelReplicationTestCase { private static final Settings settings = Settings.builder() @@ -80,4 +86,41 @@ public void testIgnoreShardIdle() throws Exception { replica.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b)); } } + + /** + * here we are starting a new primary shard in PrimaryMode and testing if the shard publishes checkpoint after refresh. + */ + public void testPublishCheckpointOnPrimaryMode() throws IOException { + final SegmentReplicationCheckpointPublisher mock = mock(SegmentReplicationCheckpointPublisher.class); + IndexShard shard = newStartedShard(true); + CheckpointRefreshListener refreshListener = new CheckpointRefreshListener(shard, mock); + refreshListener.afterRefresh(true); + + // verify checkpoint is published + verify(mock, times(1)).publish(any()); + closeShards(shard); + } + + /** + * here we are starting a new primary shard in PrimaryMode initially and starting relocation handoff. Later we complete relocation handoff then shard is no longer + * in PrimaryMode, and we test if the shard does not publish checkpoint after refresh. + */ + public void testPublishCheckpointAfterRelocationHandOff() throws IOException { + final SegmentReplicationCheckpointPublisher mock = mock(SegmentReplicationCheckpointPublisher.class); + IndexShard shard = newStartedShard(true); + CheckpointRefreshListener refreshListener = new CheckpointRefreshListener(shard, mock); + String id = shard.routingEntry().allocationId().getId(); + + // Starting relocation handoff + shard.getReplicationTracker().startRelocationHandoff(id); + + // Completing relocation handoff + shard.getReplicationTracker().completeRelocationHandoff(); + refreshListener.afterRefresh(true); + + // verify checkpoint is not published + verify(mock, times(0)).publish(any()); + closeShards(shard); + } + } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 365b4615728b1..2916f4c8152a2 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -208,6 +208,23 @@ public void testNewCheckpoint_validationPassesAndReplicationFails() throws IOExc closeShard(indexShard, false); } + /** + * here we are starting a new shard in PrimaryMode and testing that we don't process a checkpoint on shard when it is in PrimaryMode. + */ + public void testRejectCheckpointOnShardPrimaryMode() throws IOException { + SegmentReplicationTargetService spy = spy(sut); + + // Starting a new shard in PrimaryMode. + IndexShard primaryShard = newStartedShard(true); + IndexShard spyShard = spy(primaryShard); + doNothing().when(spy).startReplication(any(), any(), any()); + spy.onNewCheckpoint(aheadCheckpoint, spyShard); + + // Verify that checkpoint is not processed as shard is in PrimaryMode. + verify(spy, times(0)).startReplication(any(), any(), any()); + closeShards(primaryShard); + } + public void testReplicationOnDone() throws IOException { SegmentReplicationTargetService spy = spy(sut); IndexShard spyShard = spy(indexShard);