Skip to content

Commit

Permalink
Add changelog entry
Browse files Browse the repository at this point in the history
Signed-off-by: Suraj Singh <surajrider@gmail.com>
  • Loading branch information
dreamer-89 committed Aug 31, 2022
1 parent 970e4de commit b1bc5bb
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Add timeout on Mockito.verify to reduce flakyness in testReplicationOnDone test([#4314](https://github.com/opensearch-project/OpenSearch/pull/4314))
- Commit workflow for dependabot changelog helper ([#4331](https://github.com/opensearch-project/OpenSearch/pull/4331))
- Fixed cancellation of segment replication events ([#4225](https://github.com/opensearch-project/OpenSearch/pull/4225))
- [Segment Replication] Add check to cancel ongoing replication with old primary on onNewCheckpoint on replica ([#4363](https://github.com/opensearch-project/OpenSearch/pull/4363))

### Security
- CVE-2022-25857 org.yaml:snakeyaml DOS vulnerability ([#4341](https://github.com/opensearch-project/OpenSearch/pull/4341))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public class SegmentReplicationTargetService implements IndexEventListener {

private final Map<ShardId, ReplicationCheckpoint> latestReceivedCheckpoint = ConcurrentCollections.newConcurrentMap();


// Empty Implementation, only required while Segment Replication is under feature flag.
public static final SegmentReplicationTargetService NO_OP = new SegmentReplicationTargetService() {
@Override
Expand Down Expand Up @@ -153,12 +152,17 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe
} else {
latestReceivedCheckpoint.put(replicaShard.shardId(), receivedCheckpoint);
}
Optional<SegmentReplicationTarget> ongoingReplicationTarget = onGoingReplications.getOngoingReplicationTarget(replicaShard.shardId());
Optional<SegmentReplicationTarget> ongoingReplicationTarget = onGoingReplications.getOngoingReplicationTarget(
replicaShard.shardId()
);
if (ongoingReplicationTarget.isPresent()) {
final SegmentReplicationTarget target = ongoingReplicationTarget.get();
if (target.getCheckpoint().getPrimaryTerm() < receivedCheckpoint.getPrimaryTerm()) {
logger.info("Cancelling ongoing replication from old primary with primary term {}", target.getCheckpoint().getPrimaryTerm());
target.cancel("Stuck target after new primary");
logger.info(
"Cancelling ongoing replication from old primary with primary term {}",
target.getCheckpoint().getPrimaryTerm()
);
target.cancel("Cancelling stuck target after new primary");
} else {
logger.trace(
() -> new ParameterizedMessage(
Expand All @@ -169,7 +173,6 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe
return;
}
}

final Thread thread = Thread.currentThread();
if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint)) {
startReplication(receivedCheckpoint, replicaShard, new SegmentReplicationListener() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public void testOnNewCheckpointFromNewPrimaryCancelOngoingReplication() throws I
// wait for the new checkpoint to arrive, before the listener completes.
latch.await(5, TimeUnit.SECONDS);
doNothing().when(targetSpy).startReplication(any());
verify(targetSpy, times(1)).cancel(any());
verify(targetSpy, times(1)).cancel("Cancelling stuck target after new primary");
verify(serviceSpy, times(1)).startReplication(eq(newPrimaryCheckpoint), eq(replicaShard), any());
closeShards(replicaShard);
}
Expand Down

0 comments on commit b1bc5bb

Please sign in to comment.