Skip to content

Commit

Permalink
Adding check to make sure checkpoint is not processed when a shard's …
Browse files Browse the repository at this point in the history
…shard routing is primary. (opensearch-project#4630)

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>
Co-authored-by: Suraj Singh <surajrider@gmail.com>
  • Loading branch information
2 people authored and sashashura committed Oct 12, 2022
1 parent a7c157e commit 9549360
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Fixed the ignore_malformed setting to also ignore objects ([#4494](https://github.com/opensearch-project/OpenSearch/pull/4494))
- [Segment Replication] Ignore lock file when testing cleanupAndPreserveLatestCommitPoint ([#4544](https://github.com/opensearch-project/OpenSearch/pull/4544))
- Updated jackson to 2.13.4 and snakeyml to 1.32 ([#4556](https://github.com/opensearch-project/OpenSearch/pull/4556))
- [Segment Replication] Adding check to make sure checkpoint is not processed when a shard's shard routing is primary ([#4630](https://github.com/opensearch-project/OpenSearch/pull/4630))
- [Bug]: Fixed invalid location of JDK dependency for arm64 architecture([#4613](https://github.com/opensearch-project/OpenSearch/pull/4613))
- [Bug]: Alias filter lost after rollover ([#4499](https://github.com/opensearch-project/OpenSearch/pull/4499))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1441,6 +1441,10 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp
logger.warn("Ignoring new replication checkpoint - shard is in primaryMode and cannot receive any checkpoints.");
return false;
}
if (this.routingEntry().primary()) {
logger.warn("Ignoring new replication checkpoint - primary shard cannot receive any checkpoints.");
return false;
}
ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint();
if (localCheckpoint.isAheadOf(requestCheckpoint)) {
logger.trace(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.spy;

public class SegmentReplicationIndexShardTests extends OpenSearchIndexLevelReplicationTestCase {

Expand Down Expand Up @@ -208,6 +209,36 @@ public void testPublishCheckpointAfterRelocationHandOff() throws IOException {
closeShards(shard);
}

/**
* here we are starting a new primary shard and testing that we don't process a checkpoint on a shard when it's shard routing is primary.
*/
public void testRejectCheckpointOnShardRoutingPrimary() throws IOException {
IndexShard primaryShard = newStartedShard(true);
SegmentReplicationTargetService sut;
sut = prepareForReplication(primaryShard);
SegmentReplicationTargetService spy = spy(sut);

// Starting a new shard in PrimaryMode and shard routing primary.
IndexShard spyShard = spy(primaryShard);
String id = primaryShard.routingEntry().allocationId().getId();

// Starting relocation handoff
primaryShard.getReplicationTracker().startRelocationHandoff(id);

// Completing relocation handoff.
primaryShard.getReplicationTracker().completeRelocationHandoff();

// Assert that primary shard is no longer in Primary Mode and shard routing is still Primary
assertEquals(false, primaryShard.getReplicationTracker().isPrimaryMode());
assertEquals(true, primaryShard.routingEntry().primary());

spy.onNewCheckpoint(new ReplicationCheckpoint(primaryShard.shardId(), 0L, 0L, 0L, 0L), spyShard);

// Verify that checkpoint is not processed as shard routing is primary.
verify(spy, times(0)).startReplication(any(), any(), any());
closeShards(primaryShard);
}

public void testReplicaReceivesGenIncrease() throws Exception {
try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) {
shards.startAll();
Expand Down

0 comments on commit 9549360

Please sign in to comment.