Skip to content

Commit

Permalink
Fix bug where retries within RemoteStoreRefreshListener cause infos/c…
Browse files Browse the repository at this point in the history
…heckpoint mismatch (#10655)

* Fix bug where retries within RemoteStoreRefreshListener cause mismatch between ReplicationCheckpoint and uploaded SegmentInfos.

Retries within RemoteStoreRefreshListener run outside of the refresh thread.  This means that concurrent refreshes
may occur during syncSegments execution updating the on-reader SegmentInfos.  A shard's latest ReplicationCheckpoint
is computed and set in a refresh listener, but it is not guaranteed the listener has run before the retry fetches the infos or checkpoint independently.
This fix ensures the listener recomputes the checkpoint while fetching the SegmentInfos. This change also
ensures that we only recompute the checkpoint when necessary because it comes with an IO cost to compute StoreFileMetadata.

Signed-off-by: Marc Handalian <handalm@amazon.com>

Update refresh listener to recompute checkpoint from latest infos snapshot.

Signed-off-by: Marc Handalian <handalm@amazon.com>

Fix broken test case by comparing segments gen

Signed-off-by: Marc Handalian <handalm@amazon.com>

spotless

Signed-off-by: Marc Handalian <handalm@amazon.com>

Fix RemoteStoreRefreshListener tests

Signed-off-by: Marc Handalian <handalm@amazon.com>

* add extra log

Signed-off-by: Marc Handalian <handalm@amazon.com>

---------

Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 committed Oct 19, 2023
1 parent da24ca7 commit e389a09
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 34 deletions.
67 changes: 40 additions & 27 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1608,8 +1608,11 @@ public GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineExcepti
}

/**
* Compute and return the latest ReplicationCheckpoint for a particular shard.
* @return EMPTY checkpoint before the engine is opened and null for non-segrep enabled indices
* return the most recently computed ReplicationCheckpoint for a particular shard.
* The checkpoint is updated inside a refresh listener and may lag behind the SegmentInfos on the reader.
* To guarantee the checkpoint is upto date with the latest on-reader infos, use `getLatestSegmentInfosAndCheckpoint` instead.
*
* @return {@link ReplicationCheckpoint} - The most recently computed ReplicationCheckpoint.
*/
public ReplicationCheckpoint getLatestReplicationCheckpoint() {
return replicationTracker.getLatestReplicationCheckpoint();
Expand All @@ -1628,34 +1631,12 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() {
public Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> getLatestSegmentInfosAndCheckpoint() {
assert indexSettings.isSegRepEnabled();

Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> nullSegmentInfosEmptyCheckpoint = new Tuple<>(
new GatedCloseable<>(null, () -> {}),
getLatestReplicationCheckpoint()
);

if (getEngineOrNull() == null) {
return nullSegmentInfosEmptyCheckpoint;
}
// do not close the snapshot - caller will close it.
GatedCloseable<SegmentInfos> snapshot = null;
try {
snapshot = getSegmentInfosSnapshot();
if (snapshot.get() != null) {
SegmentInfos segmentInfos = snapshot.get();
final Map<String, StoreFileMetadata> metadataMap = store.getSegmentMetadataMap(segmentInfos);
return new Tuple<>(
snapshot,
new ReplicationCheckpoint(
this.shardId,
getOperationPrimaryTerm(),
segmentInfos.getGeneration(),
segmentInfos.getVersion(),
metadataMap.values().stream().mapToLong(StoreFileMetadata::length).sum(),
getEngine().config().getCodec().getName(),
metadataMap
)
);
}
final SegmentInfos segmentInfos = snapshot.get();
return new Tuple<>(snapshot, computeReplicationCheckpoint(segmentInfos));
} catch (IOException | AlreadyClosedException e) {
logger.error("Error Fetching SegmentInfos and latest checkpoint", e);
if (snapshot != null) {
Expand All @@ -1666,7 +1647,39 @@ public Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> getLatestSegme
}
}
}
return nullSegmentInfosEmptyCheckpoint;
return new Tuple<>(new GatedCloseable<>(null, () -> {}), getLatestReplicationCheckpoint());
}

/**
* Compute the latest {@link ReplicationCheckpoint} from a SegmentInfos.
* This function fetches a metadata snapshot from the store that comes with an IO cost.
* We will reuse the existing stored checkpoint if it is at the same SI version.
*
* @param segmentInfos {@link SegmentInfos} infos to use to compute.
* @return {@link ReplicationCheckpoint} Checkpoint computed from the infos.
* @throws IOException When there is an error computing segment metadata from the store.
*/
ReplicationCheckpoint computeReplicationCheckpoint(SegmentInfos segmentInfos) throws IOException {
if (segmentInfos == null) {
return ReplicationCheckpoint.empty(shardId);
}
final ReplicationCheckpoint latestReplicationCheckpoint = getLatestReplicationCheckpoint();
if (latestReplicationCheckpoint.getSegmentInfosVersion() == segmentInfos.getVersion()
&& latestReplicationCheckpoint.getSegmentsGen() == segmentInfos.getGeneration()) {
return latestReplicationCheckpoint;
}
final Map<String, StoreFileMetadata> metadataMap = store.getSegmentMetadataMap(segmentInfos);
final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(
this.shardId,
getOperationPrimaryTerm(),
segmentInfos.getGeneration(),
segmentInfos.getVersion(),
metadataMap.values().stream().mapToLong(StoreFileMetadata::length).sum(),
getEngine().config().getCodec().getName(),
metadataMap
);
logger.trace("Recomputed ReplicationCheckpoint for shard {}", checkpoint);
return checkpoint;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ private boolean syncSegments() {
// in the remote store.
return indexShard.state() != IndexShardState.STARTED || !(indexShard.getEngine() instanceof InternalEngine);
}
ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint();
beforeSegmentsSync();
long refreshTimeMs = segmentTracker.getLocalRefreshTimeMs(), refreshClockTimeMs = segmentTracker.getLocalRefreshClockTimeMs();
long refreshSeqNo = segmentTracker.getLocalRefreshSeqNo();
Expand All @@ -199,10 +198,7 @@ private boolean syncSegments() {

try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
SegmentInfos segmentInfos = segmentInfosGatedCloseable.get();
assert segmentInfos.getGeneration() == checkpoint.getSegmentsGen() : "SegmentInfos generation: "
+ segmentInfos.getGeneration()
+ " does not match metadata generation: "
+ checkpoint.getSegmentsGen();
final ReplicationCheckpoint checkpoint = indexShard.computeReplicationCheckpoint(segmentInfos);
// Capture replication checkpoint before uploading the segments as upload can take some time and checkpoint can
// move.
long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,8 +520,8 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
if (counter.incrementAndGet() <= succeedOnAttempt) {
throw new RuntimeException("Inducing failure in upload");
}
return indexShard.getLatestSegmentInfosAndCheckpoint();
})).when(shard).getLatestSegmentInfosAndCheckpoint();
return indexShard.getLatestReplicationCheckpoint();
})).when(shard).computeReplicationCheckpoint(any());

doAnswer(invocation -> {
if (Objects.nonNull(successLatch)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,33 @@ public void testSnapshotWhileFailoverIncomplete() throws Exception {
}
}

public void testReuseReplicationCheckpointWhenLatestInfosIsUnChanged() throws Exception {
try (ReplicationGroup shards = createGroup(1, settings, indexMapping, new NRTReplicationEngineFactory(), createTempDir())) {
final IndexShard primaryShard = shards.getPrimary();
shards.startAll();
shards.indexDocs(10);
shards.refresh("test");
replicateSegments(primaryShard, shards.getReplicas());
shards.assertAllEqual(10);
final ReplicationCheckpoint latestReplicationCheckpoint = primaryShard.getLatestReplicationCheckpoint();
try (GatedCloseable<SegmentInfos> segmentInfosSnapshot = primaryShard.getSegmentInfosSnapshot()) {
assertEquals(latestReplicationCheckpoint, primaryShard.computeReplicationCheckpoint(segmentInfosSnapshot.get()));
}
final Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> latestSegmentInfosAndCheckpoint = primaryShard
.getLatestSegmentInfosAndCheckpoint();
try (final GatedCloseable<SegmentInfos> closeable = latestSegmentInfosAndCheckpoint.v1()) {
assertEquals(latestReplicationCheckpoint, primaryShard.computeReplicationCheckpoint(closeable.get()));
}
}
}

public void testComputeReplicationCheckpointNullInfosReturnsEmptyCheckpoint() throws Exception {
try (ReplicationGroup shards = createGroup(1, settings, indexMapping, new NRTReplicationEngineFactory(), createTempDir())) {
final IndexShard primaryShard = shards.getPrimary();
assertEquals(ReplicationCheckpoint.empty(primaryShard.shardId), primaryShard.computeReplicationCheckpoint(null));
}
}

private SnapshotShardsService getSnapshotShardsService(IndexShard replicaShard) {
final TransportService transportService = mock(TransportService.class);
when(transportService.getThreadPool()).thenReturn(threadPool);
Expand Down

0 comments on commit e389a09

Please sign in to comment.