Skip to content

Commit

Permalink
Adding PrimaryMode check before publishing checkpoint.
Browse files Browse the repository at this point in the history
Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>
  • Loading branch information
Rishikesh1159 committed Aug 8, 2022
1 parent c665e7c commit e0dcca6
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void beforeRefresh() throws IOException {

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
if (didRefresh) {
if (didRefresh && shard.getReplicationTracker().isPrimaryMode()) {
publisher.publish(shard);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,10 @@
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.oneOf;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting;
import static org.opensearch.common.lucene.Lucene.cleanLuceneIndex;
import static org.opensearch.common.xcontent.ToXContent.EMPTY_PARAMS;
Expand Down Expand Up @@ -728,9 +731,9 @@ public void testOperationPermitsOnPrimaryShards() throws Exception {
final long newPrimaryTerm = indexShard.getPendingPrimaryTerm() + between(1, 1000);
CountDownLatch latch = new CountDownLatch(1);
indexShard.updateShardState(primaryRouting, newPrimaryTerm, (shard, listener) -> {
assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(newPrimaryTerm));
latch.countDown();
},
assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(newPrimaryTerm));
latch.countDown();
},
0L,
Collections.singleton(indexShard.routingEntry().allocationId().getId()),
new IndexShardRoutingTable.Builder(indexShard.shardId()).addShard(primaryRouting).build()
Expand Down Expand Up @@ -3522,7 +3525,43 @@ public void testCheckpointRefreshListenerWithNull() throws IOException {
}

/**
* creates a new initializing shard. The shard will will be put in its proper path under the
* here we are starting a new primary shard in PrimaryMode and testing if the shard publishes checkpoint after refresh.
*/
public void testPublishCheckpointOnPrimaryMode() throws IOException {
final SegmentReplicationCheckpointPublisher mock = mock(SegmentReplicationCheckpointPublisher.class);
IndexShard shard = newStartedShard(p -> newShard(mock), true);
CheckpointRefreshListener refreshListener = new CheckpointRefreshListener(shard, mock);
refreshListener.afterRefresh(true);

// verify checkpoint is published
verify(mock, times(1)).publish(any());
closeShards(shard);
}

/**
* here we are starting a new primary shard in PrimaryMode intially and starting relocation handoff. Later we complete relocation handoff then shard is no longer
* in PrimaryMode, and we test if the shard does not publish checkpoint after refresh.
*/
public void testPublishCheckpointAfterRelocationHandOff() throws IOException {
final SegmentReplicationCheckpointPublisher mock = mock(SegmentReplicationCheckpointPublisher.class);
IndexShard shard = newStartedShard(p -> newShard(mock), true);
CheckpointRefreshListener refreshListener = new CheckpointRefreshListener(shard, mock);
String id = shard.routingEntry().allocationId().getId();

// Starting relocation handoff
shard.getReplicationTracker().startRelocationHandoff(id);

// Completing relocation handoff
shard.getReplicationTracker().completeRelocationHandoff();
refreshListener.afterRefresh(true);

// verify checkpoint is not published
verify(mock, times(0)).publish(any());
closeShards(shard);
}

/**
* creates a new initializing shard. The shard will be put in its proper path under the
* current node id the shard is assigned to.
* @param checkpointPublisher Segment Replication Checkpoint Publisher to publish checkpoint
*/
Expand Down

0 comments on commit e0dcca6

Please sign in to comment.