Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract replicator logic from SegmentReplicationTargetService #15511

Merged
merged 2 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.CorruptIndexException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchCorruptionException;
import org.opensearch.action.support.ChannelActionListener;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterStateListener;
Expand All @@ -24,7 +22,6 @@
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.shard.ShardId;
Expand All @@ -33,7 +30,6 @@
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardState;
import org.opensearch.index.store.Store;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.recovery.ForceSyncRequest;
Expand Down Expand Up @@ -61,7 +57,7 @@
import static org.opensearch.indices.replication.SegmentReplicationSourceService.Actions.UPDATE_VISIBLE_CHECKPOINT;

/**
* Service class that orchestrates replication events on replicas.
* Service class that handles incoming checkpoints to initiate replication events on replicas.
*
* @opensearch.internal
*/
Expand All @@ -72,17 +68,14 @@
private final ThreadPool threadPool;
private final RecoverySettings recoverySettings;

private final ReplicationCollection<SegmentReplicationTarget> onGoingReplications;

private final Map<ShardId, SegmentReplicationState> completedReplications = ConcurrentCollections.newConcurrentMap();

private final SegmentReplicationSourceFactory sourceFactory;

protected final Map<ShardId, ReplicationCheckpoint> latestReceivedCheckpoint = ConcurrentCollections.newConcurrentMap();

private final IndicesService indicesService;
private final ClusterService clusterService;
private final TransportService transportService;
private final SegmentReplicator replicator;

/**
* The internal actions
Expand All @@ -94,6 +87,7 @@
public static final String FORCE_SYNC = "internal:index/shard/replication/segments_sync";
}

@Deprecated
public SegmentReplicationTargetService(
final ThreadPool threadPool,
final RecoverySettings recoverySettings,
Expand All @@ -113,6 +107,7 @@
);
}

@Deprecated
public SegmentReplicationTargetService(
final ThreadPool threadPool,
final RecoverySettings recoverySettings,
Expand All @@ -121,14 +116,34 @@
final IndicesService indicesService,
final ClusterService clusterService,
final ReplicationCollection<SegmentReplicationTarget> ongoingSegmentReplications
) {
this(
threadPool,
recoverySettings,
transportService,
sourceFactory,
indicesService,
clusterService,
new SegmentReplicator(threadPool)
);
}

public SegmentReplicationTargetService(
final ThreadPool threadPool,
final RecoverySettings recoverySettings,
final TransportService transportService,
final SegmentReplicationSourceFactory sourceFactory,
final IndicesService indicesService,
final ClusterService clusterService,
final SegmentReplicator replicator
) {
this.threadPool = threadPool;
this.recoverySettings = recoverySettings;
this.onGoingReplications = ongoingSegmentReplications;
this.sourceFactory = sourceFactory;
this.indicesService = indicesService;
this.clusterService = clusterService;
this.transportService = transportService;
this.replicator = replicator;

transportService.registerRequestHandler(
Actions.FILE_CHUNK,
Expand All @@ -154,7 +169,7 @@
@Override
protected void doStop() {
if (DiscoveryNode.isDataNode(clusterService.getSettings())) {
assert onGoingReplications.size() == 0 : "Replication collection should be empty on shutdown";
assert replicator.size() == 0 : "Replication collection should be empty on shutdown";
clusterService.removeListener(this);
}
}
Expand Down Expand Up @@ -199,7 +214,7 @@
@Override
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) {
if (indexShard != null && indexShard.indexSettings().isSegRepEnabledOrRemoteNode()) {
onGoingReplications.cancelForShard(indexShard.shardId(), "Shard closing");
replicator.cancel(indexShard.shardId(), "Shard closing");
latestReceivedCheckpoint.remove(shardId);
}
}
Expand All @@ -224,7 +239,7 @@
&& indexShard.indexSettings().isSegRepEnabledOrRemoteNode()
&& oldRouting.primary() == false
&& newRouting.primary()) {
onGoingReplications.cancelForShard(indexShard.shardId(), "Shard has been promoted to primary");
replicator.cancel(indexShard.shardId(), "Shard has been promoted to primary");
latestReceivedCheckpoint.remove(indexShard.shardId());
}
}
Expand All @@ -234,17 +249,15 @@
*/
@Nullable
public SegmentReplicationState getOngoingEventSegmentReplicationState(ShardId shardId) {
return Optional.ofNullable(onGoingReplications.getOngoingReplicationTarget(shardId))
.map(SegmentReplicationTarget::state)
.orElse(null);
return Optional.ofNullable(replicator.get(shardId)).map(SegmentReplicationTarget::state).orElse(null);

Check warning on line 252 in server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java#L252

Added line #L252 was not covered by tests
}

/**
* returns SegmentReplicationState of latest completed segment replication events.
*/
@Nullable
public SegmentReplicationState getlatestCompletedEventSegmentReplicationState(ShardId shardId) {
return completedReplications.get(shardId);
return replicator.getCompleted(shardId);

Check warning on line 260 in server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java#L260

Added line #L260 was not covered by tests
}

/**
Expand All @@ -257,11 +270,11 @@
}

public ReplicationRef<SegmentReplicationTarget> get(long replicationId) {
return onGoingReplications.get(replicationId);
return replicator.get(replicationId);
}

public SegmentReplicationTarget get(ShardId shardId) {
return onGoingReplications.getOngoingReplicationTarget(shardId);
return replicator.get(shardId);

Check warning on line 277 in server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java#L277

Added line #L277 was not covered by tests
}

/**
Expand All @@ -285,7 +298,7 @@
// checkpoint to be replayed once the shard is Active.
if (replicaShard.state().equals(IndexShardState.STARTED) == true) {
// Checks if received checkpoint is already present and ahead then it replaces old received checkpoint
SegmentReplicationTarget ongoingReplicationTarget = onGoingReplications.getOngoingReplicationTarget(replicaShard.shardId());
SegmentReplicationTarget ongoingReplicationTarget = replicator.get(replicaShard.shardId());
if (ongoingReplicationTarget != null) {
if (ongoingReplicationTarget.getCheckpoint().getPrimaryTerm() < receivedCheckpoint.getPrimaryTerm()) {
logger.debug(
Expand Down Expand Up @@ -504,28 +517,12 @@
final ReplicationCheckpoint checkpoint,
final SegmentReplicationListener listener
) {
final SegmentReplicationTarget target = new SegmentReplicationTarget(
indexShard,
checkpoint,
sourceFactory.get(indexShard),
listener
);
startReplication(target);
return target;
return replicator.startReplication(indexShard, checkpoint, sourceFactory.get(indexShard), listener);
}

// pkg-private for integration tests
void startReplication(final SegmentReplicationTarget target) {
final long replicationId;
try {
replicationId = onGoingReplications.startSafe(target, recoverySettings.activityTimeout());
} catch (ReplicationFailedException e) {
// replication already running for shard.
target.fail(e, false);
return;
}
logger.trace(() -> new ParameterizedMessage("Added new replication to collection {}", target.description()));
threadPool.generic().execute(new ReplicationRunner(replicationId));
replicator.startReplication(target, recoverySettings.activityTimeout());
}

/**
Expand All @@ -550,89 +547,14 @@
void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure);
}

/**
* Runnable implementation to trigger a replication event.
*/
private class ReplicationRunner extends AbstractRunnable {

final long replicationId;

public ReplicationRunner(long replicationId) {
this.replicationId = replicationId;
}

@Override
public void onFailure(Exception e) {
onGoingReplications.fail(replicationId, new ReplicationFailedException("Unexpected Error during replication", e), false);
}

@Override
public void doRun() {
start(replicationId);
}
}

private void start(final long replicationId) {
final SegmentReplicationTarget target;
try (ReplicationRef<SegmentReplicationTarget> replicationRef = onGoingReplications.get(replicationId)) {
// This check is for handling edge cases where the reference is removed before the ReplicationRunner is started by the
// threadpool.
if (replicationRef == null) {
return;
}
target = replicationRef.get();
}
target.startReplication(new ActionListener<>() {
@Override
public void onResponse(Void o) {
logger.debug(() -> new ParameterizedMessage("Finished replicating {} marking as done.", target.description()));
onGoingReplications.markAsDone(replicationId);
if (target.state().getIndex().recoveredFileCount() != 0 && target.state().getIndex().recoveredBytes() != 0) {
completedReplications.put(target.shardId(), target.state());
}
}

@Override
public void onFailure(Exception e) {
logger.debug("Replication failed {}", target.description());
if (isStoreCorrupt(target) || e instanceof CorruptIndexException || e instanceof OpenSearchCorruptionException) {
onGoingReplications.fail(replicationId, new ReplicationFailedException("Store corruption during replication", e), true);
return;
}
onGoingReplications.fail(replicationId, new ReplicationFailedException("Segment Replication failed", e), false);
}
});
}

private boolean isStoreCorrupt(SegmentReplicationTarget target) {
// ensure target is not already closed. In that case
// we can assume the store is not corrupt and that the replication
// event completed successfully.
if (target.refCount() > 0) {
final Store store = target.store();
if (store.tryIncRef()) {
try {
return store.isMarkedCorrupted();
} catch (IOException ex) {
logger.warn("Unable to determine if store is corrupt", ex);
return false;
} finally {
store.decRef();
}
}
}
// store already closed.
return false;
}

private class FileChunkTransportRequestHandler implements TransportRequestHandler<FileChunkRequest> {

// How many bytes we've copied since we last called RateLimiter.pause
final AtomicLong bytesSinceLastPause = new AtomicLong();

@Override
public void messageReceived(final FileChunkRequest request, TransportChannel channel, Task task) throws Exception {
try (ReplicationRef<SegmentReplicationTarget> ref = onGoingReplications.getSafe(request.recoveryId(), request.shardId())) {
try (ReplicationRef<SegmentReplicationTarget> ref = replicator.get(request.recoveryId(), request.shardId())) {

Check warning on line 557 in server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java#L557

Added line #L557 was not covered by tests
final SegmentReplicationTarget target = ref.get();
final ActionListener<Void> listener = target.createOrFinishListener(channel, Actions.FILE_CHUNK, request);
target.handleFileChunk(request, target, bytesSinceLastPause, recoverySettings.replicationRateLimiter(), listener);
Expand Down
Loading
Loading