Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CCR] Made shard follow task more resilient against node failure and #31242

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
Expand All @@ -31,17 +33,19 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ActionTransportException;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest;
Expand All @@ -58,6 +62,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import java.util.function.Supplier;
Expand Down Expand Up @@ -141,7 +146,8 @@ void prepare(Client leaderClient, Client followerClient, ShardFollowNodeTask tas
// TODO: check if both indices have the same history uuid
if (leaderGlobalCheckPoint == followGlobalCheckPoint) {
logger.debug("{} no write operations to fetch", followerShard);
retry(leaderClient, followerClient, task, params, followGlobalCheckPoint, imdVersionChecker);
retry(() -> prepare(leaderClient, followerClient, task, params, followGlobalCheckPoint, imdVersionChecker),
task::markAsFailed);
} else {
assert followGlobalCheckPoint < leaderGlobalCheckPoint : "followGlobalCheckPoint [" + followGlobalCheckPoint +
"] is not below leaderGlobalCheckPoint [" + leaderGlobalCheckPoint + "]";
Expand All @@ -156,34 +162,47 @@ void prepare(Client leaderClient, Client followerClient, ShardFollowNodeTask tas
task.markAsFailed(e);
}
};
ChunksCoordinator coordinator = new ChunksCoordinator(followerClient, leaderClient, ccrExecutor, imdVersionChecker,
params.getMaxChunkSize(), params.getNumConcurrentChunks(), params.getProcessorMaxTranslogBytes(), leaderShard,
followerShard, handler);
Consumer<Runnable> scheduler = scheduleTask -> retry(scheduleTask, handler);
ChunksCoordinator coordinator = new ChunksCoordinator(followerClient, leaderClient, scheduler, ccrExecutor,
imdVersionChecker, params.getMaxChunkSize(), params.getNumConcurrentChunks(),
params.getProcessorMaxTranslogBytes(), leaderShard, followerShard, handler, task::isRunning);
coordinator.createChucks(followGlobalCheckPoint, leaderGlobalCheckPoint);
coordinator.start();
}
}, task::markAsFailed);
}

private void retry(Client leaderClient, Client followerClient, ShardFollowNodeTask task, ShardFollowTask params,
long followGlobalCheckPoint,
IndexMetadataVersionChecker imdVersionChecker) {
private void retry(Runnable task, Consumer<Exception> errorHandler) {
threadPool.schedule(RETRY_TIMEOUT, Ccr.CCR_THREAD_POOL_NAME, new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
task.markAsFailed(e);
errorHandler.accept(e);
}

@Override
protected void doRun() throws Exception {
prepare(leaderClient, followerClient, task, params, followGlobalCheckPoint, imdVersionChecker);
task.run();
}
});
}

private void fetchGlobalCheckpoint(Client client, ShardId shardId, LongConsumer handler, Consumer<Exception> errorHandler) {
fetchGlobalCheckpoint(client, shardId, handler, errorHandler, 0);
}

private void fetchGlobalCheckpoint(Client client, ShardId shardId, LongConsumer handler, Consumer<Exception> errorHandler,
int attempt) {
client.admin().indices().stats(new IndicesStatsRequest().indices(shardId.getIndexName()), ActionListener.wrap(r -> {
IndexStats indexStats = r.getIndex(shardId.getIndexName());
if (indexStats == null) {
if (attempt <= 5) {
retry(() -> fetchGlobalCheckpoint(client, shardId, handler, errorHandler, attempt + 1), errorHandler);
} else {
errorHandler.accept(new IllegalArgumentException("Cannot find shard stats for shard " + shardId));
}
return;
}

Optional<ShardStats> filteredShardStats = Arrays.stream(indexStats.getShards())
.filter(shardStats -> shardStats.getShardRouting().shardId().equals(shardId))
.filter(shardStats -> shardStats.getShardRouting().primary())
Expand All @@ -193,7 +212,11 @@ private void fetchGlobalCheckpoint(Client client, ShardId shardId, LongConsumer
final long globalCheckPoint = filteredShardStats.get().getSeqNoStats().getGlobalCheckpoint();
handler.accept(globalCheckPoint);
} else {
errorHandler.accept(new IllegalArgumentException("Cannot find shard stats for shard " + shardId));
if (attempt <= PROCESSOR_RETRY_LIMIT) {
retry(() -> fetchGlobalCheckpoint(client, shardId, handler, errorHandler, attempt + 1), errorHandler);
} else {
errorHandler.accept(new IllegalArgumentException("Cannot find shard stats for shard " + shardId));
}
}
}, errorHandler));
}
Expand All @@ -213,16 +236,28 @@ static class ChunksCoordinator {
private final ShardId leaderShard;
private final ShardId followerShard;
private final Consumer<Exception> handler;
private final BooleanSupplier isRunning;
private final Consumer<Runnable> scheduler;

private final CountDown countDown;
private final Queue<long[]> chunks = new ConcurrentLinkedQueue<>();
private final AtomicReference<Exception> failureHolder = new AtomicReference<>();

ChunksCoordinator(Client followerClient, Client leaderClient, Executor ccrExecutor, IndexMetadataVersionChecker imdVersionChecker,
long batchSize, int concurrentProcessors, long processorMaxTranslogBytes, ShardId leaderShard,
ShardId followerShard, Consumer<Exception> handler) {
ChunksCoordinator(Client followerClient,
Client leaderClient,
Consumer<Runnable> scheduler,
Executor ccrExecutor,
IndexMetadataVersionChecker imdVersionChecker,
long batchSize,
int concurrentProcessors,
long processorMaxTranslogBytes,
ShardId leaderShard,
ShardId followerShard,
Consumer<Exception> handler,
BooleanSupplier isRunning) {
this.followerClient = followerClient;
this.leaderClient = leaderClient;
this.scheduler = scheduler;
this.ccrExecutor = ccrExecutor;
this.imdVersionChecker = imdVersionChecker;
this.batchSize = batchSize;
Expand All @@ -231,6 +266,7 @@ static class ChunksCoordinator {
this.leaderShard = leaderShard;
this.followerShard = followerShard;
this.handler = handler;
this.isRunning = isRunning;
this.countDown = new CountDown(concurrentProcessors);
}

Expand Down Expand Up @@ -285,8 +321,8 @@ void processNextChunk() {
postProcessChuck(e);
}
};
ChunkProcessor processor = new ChunkProcessor(leaderClient, followerClient, chunks, ccrExecutor, imdVersionChecker,
leaderShard, followerShard, processorHandler);
ChunkProcessor processor = new ChunkProcessor(leaderClient, followerClient, scheduler, chunks, ccrExecutor,
imdVersionChecker, leaderShard, followerShard, processorHandler, isRunning);
processor.start(chunk[0], chunk[1], processorMaxTranslogBytes);
}

Expand All @@ -308,28 +344,41 @@ Queue<long[]> getChunks() {

static class ChunkProcessor {

private static final Logger LOGGER = Loggers.getLogger(ChunkProcessor.class);

private final Client leaderClient;
private final Client followerClient;
private final Queue<long[]> chunks;
private final Executor ccrExecutor;
private final BiConsumer<Long, Consumer<Exception>> indexVersionChecker;
private final BooleanSupplier isRunning;
private final Consumer<Runnable> scheduler;

private final ShardId leaderShard;
private final ShardId followerShard;
private final Consumer<Exception> handler;
final AtomicInteger retryCounter = new AtomicInteger(0);

ChunkProcessor(Client leaderClient, Client followerClient, Queue<long[]> chunks, Executor ccrExecutor,
ChunkProcessor(Client leaderClient,
Client followerClient,
Consumer<Runnable> scheduler,
Queue<long[]> chunks,
Executor ccrExecutor,
BiConsumer<Long, Consumer<Exception>> indexVersionChecker,
ShardId leaderShard, ShardId followerShard, Consumer<Exception> handler) {
ShardId leaderShard,
ShardId followerShard,
Consumer<Exception> handler,
BooleanSupplier isRunning) {
this.leaderClient = leaderClient;
this.followerClient = followerClient;
this.scheduler = scheduler;
this.chunks = chunks;
this.ccrExecutor = ccrExecutor;
this.indexVersionChecker = indexVersionChecker;
this.leaderShard = leaderShard;
this.followerShard = followerShard;
this.handler = handler;
this.isRunning = isRunning;
}

void start(final long from, final long to, final long maxTranslogsBytes) {
Expand All @@ -347,17 +396,7 @@ public void onResponse(ShardChangesAction.Response response) {

@Override
public void onFailure(Exception e) {
assert e != null;
if (shouldRetry(e)) {
if (retryCounter.incrementAndGet() <= PROCESSOR_RETRY_LIMIT) {
start(from, to, maxTranslogsBytes);
} else {
handler.accept(new ElasticsearchException("retrying failed [" + retryCounter.get() +
"] times, aborting...", e));
}
} else {
handler.accept(e);
}
retryOrFail(e, () -> start(from, to, maxTranslogsBytes));
}
});
}
Expand All @@ -382,28 +421,20 @@ public void onFailure(Exception e) {
protected void doRun() throws Exception {
indexVersionChecker.accept(response.getIndexMetadataVersion(), e -> {
if (e != null) {
if (shouldRetry(e) && retryCounter.incrementAndGet() <= PROCESSOR_RETRY_LIMIT) {
handleResponse(to, response);
} else {
handler.accept(new ElasticsearchException("retrying failed [" + retryCounter.get() +
"] times, aborting...", e));
}
retryOrFail(e, () -> handleResponse(to, response));
return;
}
final BulkShardOperationsRequest request = new BulkShardOperationsRequest(followerShard, response.getOperations());
followerClient.execute(BulkShardOperationsAction.INSTANCE, request,
new ActionListener<BulkShardOperationsResponse>() {
@Override
public void onResponse(final BulkShardOperationsResponse bulkShardOperationsResponse) {
handler.accept(null);
}
@Override
public void onResponse(final BulkShardOperationsResponse bulkShardOperationsResponse) {
handler.accept(null);
}

@Override
public void onFailure(final Exception e) {
// No retry mechanism here, because if a failure is being redirected to this place it is considered
// non recoverable.
assert e != null;
handler.accept(e);
retryOrFail(e, () -> handleResponse(to, response));
}
}
);
Expand All @@ -412,10 +443,32 @@ public void onFailure(final Exception e) {
});
}

void retryOrFail(Exception e, Runnable retryAction) {
assert e != null;
if (shouldRetry(e)) {
if (canRetry()) {
LOGGER.debug(() -> new ParameterizedMessage("{} Retrying [{}]...", leaderShard, retryCounter.get()), e);
scheduler.accept(retryAction);
} else {
handler.accept(new ElasticsearchException("retrying failed [" + retryCounter.get() + "] times, aborting...", e));
}
} else {
handler.accept(e);
}
}

boolean shouldRetry(Exception e) {
// TODO: What other exceptions should be retried?
return NetworkExceptionHelper.isConnectException(e) ||
NetworkExceptionHelper.isCloseConnectionException(e);
NetworkExceptionHelper.isCloseConnectionException(e) ||
e instanceof ActionTransportException ||
e instanceof NodeClosedException ||
e instanceof UnavailableShardsException ||
e instanceof NoShardAvailableActionException;
}

boolean canRetry() {
return isRunning.getAsBoolean() && retryCounter.incrementAndGet() <= PROCESSOR_RETRY_LIMIT;
}

}
Expand Down Expand Up @@ -471,7 +524,7 @@ static final class IndexMetadataVersionChecker implements BiConsumer<Long, Consu

public void accept(Long minimumRequiredIndexMetadataVersion, Consumer<Exception> handler) {
if (currentIndexMetadataVersion.get() >= minimumRequiredIndexMetadataVersion) {
LOGGER.debug("Current index metadata version [{}] is higher or equal than minimum required index metadata version [{}]",
LOGGER.trace("Current index metadata version [{}] is higher or equal than minimum required index metadata version [{}]",
currentIndexMetadataVersion.get(), minimumRequiredIndexMetadataVersion);
handler.accept(null);
} else {
Expand Down
Loading