Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CCR] Refactor ChunksCoordinator to continuously look for changes in leader shard #30898

Closed
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.elasticsearch.common.transport.NetworkExceptionHelper;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.common.util.concurrent.ThreadContext;
Expand All @@ -56,7 +55,6 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
Expand All @@ -69,7 +67,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
static final int PROCESSOR_RETRY_LIMIT = 16;
static final int DEFAULT_CONCURRENT_PROCESSORS = 1;
static final long DEFAULT_MAX_TRANSLOG_BYTES= Long.MAX_VALUE;
private static final TimeValue RETRY_TIMEOUT = TimeValue.timeValueMillis(500);
private static final TimeValue CHECK_LEADER_GLOBAL_CHECKPOINT_INTERVAL = TimeValue.timeValueSeconds(3);

private final Client client;
private final ThreadPool threadPool;
Expand Down Expand Up @@ -130,58 +128,20 @@ protected void nodeOperation(AllocatedPersistentTask task, ShardFollowTask param
void prepare(Client leaderClient, Client followerClient, ShardFollowNodeTask task, ShardFollowTask params,
long followGlobalCheckPoint,
IndexMetadataVersionChecker imdVersionChecker) {
if (task.isRunning() == false) {
// TODO: need better cancellation control
return;
}

final ShardId leaderShard = params.getLeaderShardId();
final ShardId followerShard = params.getFollowShardId();
fetchGlobalCheckpoint(leaderClient, leaderShard, leaderGlobalCheckPoint -> {
// TODO: check if both indices have the same history uuid
if (leaderGlobalCheckPoint == followGlobalCheckPoint) {
logger.debug("{} no write operations to fetch", followerShard);
retry(leaderClient, followerClient, task, params, followGlobalCheckPoint, imdVersionChecker);
} else {
assert followGlobalCheckPoint < leaderGlobalCheckPoint : "followGlobalCheckPoint [" + followGlobalCheckPoint +
"] is not below leaderGlobalCheckPoint [" + leaderGlobalCheckPoint + "]";
logger.debug("{} fetching write operations, leaderGlobalCheckPoint={}, followGlobalCheckPoint={}", followerShard,
leaderGlobalCheckPoint, followGlobalCheckPoint);
Executor ccrExecutor = threadPool.executor(Ccr.CCR_THREAD_POOL_NAME);
Consumer<Exception> handler = e -> {
if (e == null) {
task.updateProcessedGlobalCheckpoint(leaderGlobalCheckPoint);
prepare(leaderClient, followerClient, task, params, leaderGlobalCheckPoint, imdVersionChecker);
} else {
task.markAsFailed(e);
}
};
ChunksCoordinator coordinator = new ChunksCoordinator(followerClient, leaderClient, ccrExecutor, imdVersionChecker,
params.getMaxChunkSize(), params.getNumConcurrentChunks(), params.getProcessorMaxTranslogBytes(), leaderShard,
followerShard, handler);
coordinator.createChucks(followGlobalCheckPoint, leaderGlobalCheckPoint);
coordinator.start();
}
logger.debug("{} fetching write operations, leaderGlobalCheckPoint={}, followGlobalCheckPoint={}", followerShard,
leaderGlobalCheckPoint, followGlobalCheckPoint);
ChunksCoordinator coordinator = new ChunksCoordinator(followerClient, leaderClient, threadPool, imdVersionChecker,
params.getMaxChunkSize(), params.getNumConcurrentChunks(), params.getProcessorMaxTranslogBytes(), leaderShard,
followerShard, task::markAsFailed, task::isRunning, task::updateProcessedGlobalCheckpoint);
coordinator.start(followGlobalCheckPoint, leaderGlobalCheckPoint);
}, task::markAsFailed);
}

private void retry(Client leaderClient, Client followerClient, ShardFollowNodeTask task, ShardFollowTask params,
long followGlobalCheckPoint,
IndexMetadataVersionChecker imdVersionChecker) {
threadPool.schedule(RETRY_TIMEOUT, Ccr.CCR_THREAD_POOL_NAME, new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
task.markAsFailed(e);
}

@Override
protected void doRun() throws Exception {
prepare(leaderClient, followerClient, task, params, followGlobalCheckPoint, imdVersionChecker);
}
});
}

private void fetchGlobalCheckpoint(Client client, ShardId shardId, LongConsumer handler, Consumer<Exception> errorHandler) {
private static void fetchGlobalCheckpoint(Client client, ShardId shardId, LongConsumer handler, Consumer<Exception> errorHandler) {
client.admin().indices().stats(new IndicesStatsRequest().indices(shardId.getIndexName()), ActionListener.wrap(r -> {
IndexStats indexStats = r.getIndex(shardId.getIndexName());
Optional<ShardStats> filteredShardStats = Arrays.stream(indexStats.getShards())
Expand All @@ -201,37 +161,53 @@ private void fetchGlobalCheckpoint(Client client, ShardId shardId, LongConsumer
static class ChunksCoordinator {

private static final Logger LOGGER = Loggers.getLogger(ChunksCoordinator.class);

private final Client followerClient;
private final Client leaderClient;
private final ThreadPool threadPool;
private final Executor ccrExecutor;
private final IndexMetadataVersionChecker imdVersionChecker;

private final long batchSize;
private final int concurrentProcessors;
private final int maxConcurrentWorker;
private final long processorMaxTranslogBytes;
private final ShardId leaderShard;
private final ShardId followerShard;
private final Consumer<Exception> handler;

private final CountDown countDown;
private final Queue<long[]> chunks = new ConcurrentLinkedQueue<>();
private final AtomicReference<Exception> failureHolder = new AtomicReference<>();

ChunksCoordinator(Client followerClient, Client leaderClient, Executor ccrExecutor, IndexMetadataVersionChecker imdVersionChecker,
long batchSize, int concurrentProcessors, long processorMaxTranslogBytes, ShardId leaderShard,
ShardId followerShard, Consumer<Exception> handler) {
private final Consumer<Exception> failureHandler;
private final Supplier<Boolean> stateSupplier;
private final LongConsumer processedGlobalCheckpointUpdater;

private final AtomicInteger activeWorkers;
private final AtomicLong lastProcessedGlobalCheckpoint;
private final Queue<long[]> chunkWorkerQueue = new ConcurrentLinkedQueue<>();

ChunksCoordinator(Client followerClient,
Client leaderClient,
ThreadPool threadPool,
IndexMetadataVersionChecker imdVersionChecker,
long batchSize,
int maxConcurrentWorker,
long processorMaxTranslogBytes,
ShardId leaderShard,
ShardId followerShard,
Consumer<Exception> failureHandler,
Supplier<Boolean> runningSuppler,
LongConsumer processedGlobalCheckpointUpdater) {
this.followerClient = followerClient;
this.leaderClient = leaderClient;
this.ccrExecutor = ccrExecutor;
this.threadPool = threadPool;
this.imdVersionChecker = imdVersionChecker;
this.ccrExecutor = threadPool.executor(Ccr.CCR_THREAD_POOL_NAME);
this.batchSize = batchSize;
this.concurrentProcessors = concurrentProcessors;
this.maxConcurrentWorker = maxConcurrentWorker;
this.processorMaxTranslogBytes = processorMaxTranslogBytes;
this.leaderShard = leaderShard;
this.followerShard = followerShard;
this.handler = handler;
this.countDown = new CountDown(concurrentProcessors);
this.failureHandler = failureHandler;
this.stateSupplier = runningSuppler;
this.processedGlobalCheckpointUpdater = processedGlobalCheckpointUpdater;
this.activeWorkers = new AtomicInteger();
this.lastProcessedGlobalCheckpoint = new AtomicLong();
}

/**
Expand All @@ -244,69 +220,113 @@ void createChucks(final long from, final long to) {
LOGGER.debug("{} Creating chunks for operation range [{}] to [{}]", leaderShard, from, to);
for (long i = from; i < to; i += batchSize) {
long v2 = i + batchSize <= to ? i + batchSize - 1 : to;
chunks.add(new long[]{i, v2});
chunkWorkerQueue.add(new long[]{i, v2});
}
}

void start() {
void updateChunksQueue(long previousGlobalcheckpoint) {
schedule(CHECK_LEADER_GLOBAL_CHECKPOINT_INTERVAL, () -> {
if (stateSupplier.get() == false) {
chunkWorkerQueue.clear();
return;
}

fetchGlobalCheckpoint(leaderClient, leaderShard, currentGlobalCheckPoint -> {
if (currentGlobalCheckPoint != previousGlobalcheckpoint) {
assert previousGlobalcheckpoint < currentGlobalCheckPoint : "followGlobalCheckPoint [" + previousGlobalcheckpoint +
"] is not below leaderGlobalCheckPoint [" + currentGlobalCheckPoint + "]";
createChucks(previousGlobalcheckpoint, currentGlobalCheckPoint);
initiateChunkWorkers();
updateChunksQueue(currentGlobalCheckPoint);
} else {
LOGGER.debug("{} no write operations to fetch", followerShard);
updateChunksQueue(previousGlobalcheckpoint);
}
}, failureHandler);
});
}

void start(long followerGlobalCheckpoint, long leaderGlobalCheckPoint) {
createChucks(followerGlobalCheckpoint, leaderGlobalCheckPoint);
LOGGER.debug("{} Start coordination of [{}] chunks with [{}] concurrent processors",
leaderShard, chunks.size(), concurrentProcessors);
for (int i = 0; i < concurrentProcessors; i++) {
leaderShard, chunkWorkerQueue.size(), maxConcurrentWorker);
initiateChunkWorkers();
updateChunksQueue(leaderGlobalCheckPoint);
}

void initiateChunkWorkers() {
int workersToStart = maxConcurrentWorker - activeWorkers.get();
if (workersToStart == 0) {
LOGGER.debug("{} No new chunk workers were started", followerShard);
return;
}

LOGGER.debug("{} Starting [{}] new chunk workers", followerShard, workersToStart);
for (int i = 0; i < workersToStart; i++) {
ccrExecutor.execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
assert e != null;
LOGGER.error(() -> new ParameterizedMessage("{} Failure starting processor", leaderShard), e);
postProcessChuck(e);
LOGGER.error(() -> new ParameterizedMessage("{} Failure starting processor", followerShard), e);
failureHandler.accept(e);
}

@Override
protected void doRun() throws Exception {
processNextChunk();
}
});
activeWorkers.incrementAndGet();
}
}

void processNextChunk() {
long[] chunk = chunks.poll();
long[] chunk = chunkWorkerQueue.poll();
if (chunk == null) {
postProcessChuck(null);
int activeWorkers = this.activeWorkers.decrementAndGet();
LOGGER.debug("{} No more chunks to process, active workers [{}]", leaderShard, activeWorkers);
return;
}
LOGGER.debug("{} Processing chunk [{}/{}]", leaderShard, chunk[0], chunk[1]);
Consumer<Exception> processorHandler = e -> {
if (e == null) {
LOGGER.debug("{} Successfully processed chunk [{}/{}]", leaderShard, chunk[0], chunk[1]);
if (lastProcessedGlobalCheckpoint.updateAndGet(x -> x < chunk[1] ? chunk[1] : x) == chunk[1]) {
processedGlobalCheckpointUpdater.accept(chunk[1]);
}
processNextChunk();
} else {
LOGGER.error(() -> new ParameterizedMessage("{} Failure processing chunk [{}/{}]",
leaderShard, chunk[0], chunk[1]), e);
postProcessChuck(e);
failureHandler.accept(e);
}
};
ChunkProcessor processor = new ChunkProcessor(leaderClient, followerClient, chunks, ccrExecutor, imdVersionChecker,
ChunkWorker worker = new ChunkWorker(leaderClient, followerClient, chunkWorkerQueue, ccrExecutor, imdVersionChecker,
leaderShard, followerShard, processorHandler);
processor.start(chunk[0], chunk[1], processorMaxTranslogBytes);
worker.start(chunk[0], chunk[1], processorMaxTranslogBytes);
}

void postProcessChuck(Exception e) {
if (failureHolder.compareAndSet(null, e) == false) {
Exception firstFailure = failureHolder.get();
firstFailure.addSuppressed(e);
}
if (countDown.countDown()) {
handler.accept(failureHolder.get());
}
void schedule(TimeValue delay, Runnable runnable) {
threadPool.schedule(delay, Ccr.CCR_THREAD_POOL_NAME, new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
failureHandler.accept(e);
}

@Override
protected void doRun() throws Exception {
runnable.run();
}
});
}

Queue<long[]> getChunks() {
return chunks;
Queue<long[]> getChunkWorkerQueue() {
return chunkWorkerQueue;
}

}

static class ChunkProcessor {
static class ChunkWorker {

private final Client leaderClient;
private final Client followerClient;
Expand All @@ -319,9 +339,9 @@ static class ChunkProcessor {
private final Consumer<Exception> handler;
final AtomicInteger retryCounter = new AtomicInteger(0);

ChunkProcessor(Client leaderClient, Client followerClient, Queue<long[]> chunks, Executor ccrExecutor,
BiConsumer<Long, Consumer<Exception>> indexVersionChecker,
ShardId leaderShard, ShardId followerShard, Consumer<Exception> handler) {
ChunkWorker(Client leaderClient, Client followerClient, Queue<long[]> chunks, Executor ccrExecutor,
BiConsumer<Long, Consumer<Exception>> indexVersionChecker, ShardId leaderShard, ShardId followerShard,
Consumer<Exception> handler) {
this.leaderClient = leaderClient;
this.followerClient = followerClient;
this.chunks = chunks;
Expand Down
Loading