Skip to content

Commit

Permalink
[Segment Replication] Adding PrimaryMode check before publishing chec…
Browse files Browse the repository at this point in the history
…kpoint and processing a received checkpoint. (opensearch-project#4157)

* Adding PrimaryMode check before publishing checkpoint.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Applying spotless check

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Moving segrep specific tests to SegmentReplicationIndexShardTests.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Adding logic and tests for rejecting checkpoints if shard is in PrimaryMode.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Applying ./gradlew :server:spotlessApply.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Applying ./gradlew :server:spotlessApply

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Changing log level to warn in shouldProcessCheckpoint() of IndexShard.java class.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Removing unnecessary lazy logging in shouldProcessCheckpoint().

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>
  • Loading branch information
Rishikesh1159 committed Aug 17, 2022
1 parent 6edcf1a commit f5156c9
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void beforeRefresh() throws IOException {

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
if (didRefresh) {
if (didRefresh && shard.getReplicationTracker().isPrimaryMode()) {
publisher.publish(shard);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1420,6 +1420,10 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp
logger.trace(() -> new ParameterizedMessage("Ignoring new replication checkpoint - shard is not started {}", state()));
return false;
}
if (getReplicationTracker().isPrimaryMode()) {
logger.warn("Ignoring new replication checkpoint - shard is in primaryMode and cannot receive any checkpoints.");
return false;
}
ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint();
if (localCheckpoint.isAheadOf(requestCheckpoint)) {
logger.trace(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3453,7 +3453,7 @@ public void testCheckpointRefreshListenerWithNull() throws IOException {
}

/**
* creates a new initializing shard. The shard will will be put in its proper path under the
* creates a new initializing shard. The shard will be put in its proper path under the
* current node id the shard is assigned to.
* @param checkpointPublisher Segment Replication Checkpoint Publisher to publish checkpoint
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,17 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationType;

import java.io.IOException;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;

public class SegmentReplicationIndexShardTests extends OpenSearchIndexLevelReplicationTestCase {

private static final Settings settings = Settings.builder()
Expand Down Expand Up @@ -80,4 +86,41 @@ public void testIgnoreShardIdle() throws Exception {
replica.awaitShardSearchActive(b -> assertFalse("A new RefreshListener should not be registered", b));
}
}

/**
* here we are starting a new primary shard in PrimaryMode and testing if the shard publishes checkpoint after refresh.
*/
public void testPublishCheckpointOnPrimaryMode() throws IOException {
final SegmentReplicationCheckpointPublisher mock = mock(SegmentReplicationCheckpointPublisher.class);
IndexShard shard = newStartedShard(true);
CheckpointRefreshListener refreshListener = new CheckpointRefreshListener(shard, mock);
refreshListener.afterRefresh(true);

// verify checkpoint is published
verify(mock, times(1)).publish(any());
closeShards(shard);
}

/**
* here we are starting a new primary shard in PrimaryMode initially and starting relocation handoff. Later we complete relocation handoff then shard is no longer
* in PrimaryMode, and we test if the shard does not publish checkpoint after refresh.
*/
public void testPublishCheckpointAfterRelocationHandOff() throws IOException {
final SegmentReplicationCheckpointPublisher mock = mock(SegmentReplicationCheckpointPublisher.class);
IndexShard shard = newStartedShard(true);
CheckpointRefreshListener refreshListener = new CheckpointRefreshListener(shard, mock);
String id = shard.routingEntry().allocationId().getId();

// Starting relocation handoff
shard.getReplicationTracker().startRelocationHandoff(id);

// Completing relocation handoff
shard.getReplicationTracker().completeRelocationHandoff();
refreshListener.afterRefresh(true);

// verify checkpoint is not published
verify(mock, times(0)).publish(any());
closeShards(shard);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,23 @@ public void testNewCheckpoint_validationPassesAndReplicationFails() throws IOExc
closeShard(indexShard, false);
}

/**
* here we are starting a new shard in PrimaryMode and testing that we don't process a checkpoint on shard when it is in PrimaryMode.
*/
public void testRejectCheckpointOnShardPrimaryMode() throws IOException {
SegmentReplicationTargetService spy = spy(sut);

// Starting a new shard in PrimaryMode.
IndexShard primaryShard = newStartedShard(true);
IndexShard spyShard = spy(primaryShard);
doNothing().when(spy).startReplication(any(), any(), any());
spy.onNewCheckpoint(aheadCheckpoint, spyShard);

// Verify that checkpoint is not processed as shard is in PrimaryMode.
verify(spy, times(0)).startReplication(any(), any(), any());
closeShards(primaryShard);
}

public void testReplicationOnDone() throws IOException {
SegmentReplicationTargetService spy = spy(sut);
IndexShard spyShard = spy(indexShard);
Expand Down

0 comments on commit f5156c9

Please sign in to comment.