From 516dcb7a9455beec699c7f9c29e15058a6efb660 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 22 Jun 2018 10:55:29 +0200 Subject: [PATCH] Rewrite shard follow node task logic The current shard follow mechanism is complex and does not give us easy ways the have visibility into the system (e.g. why we are falling behind). The main reason why it is complex is because the current design is highly asynchronous. Also in the current model it is hard to apply backpressure other than reducing the concurrent reads from the leader shard. This PR has the following changes: * Rewrote the shard follow task to coordinate the shard follow mechanism between a leader and follow shard in a single threaded manner. This allows for better unit testing and makes it easier to add stats. * All write operations read from the shard changes api should be added to a buffer instead of directly sending it to the bulk shard operations api. This allows to apply backpressure. In this PR there is a limit that controls how many write ops are allowed in the buffer after which no new reads will be performed until the number of ops is below that limit. * The shard changes api includes the current global checkpoint on the leader shard copy. This allows reading to be a more self sufficient process; instead of relying on a background thread to fetch the leader shard's global checkpoint. * Reading write operations from the leader shard (via shard changes api) is a seperate step then writing the write operations (via bulk shards operations api). Whereas before a read would immediately result into a write. * The bulk shard operations api returns the local checkpoint on the follow primary shard, to keep the shard follow task up to date with what has been written. * Moved the shard follow logic that was previously in ShardFollowTasksExecutor to ShardFollowNodeTask. * Moved over the changes from #31242 to make shard follow mechanism resilient from node and shard failures. Relates to #30086 --- .../xpack/ccr/action/FollowIndexAction.java | 87 +++- .../xpack/ccr/action/ShardChangesAction.java | 52 ++- .../xpack/ccr/action/ShardFollowNodeTask.java | 386 +++++++++++++++- .../xpack/ccr/action/ShardFollowTask.java | 157 +++++-- .../ccr/action/ShardFollowTasksExecutor.java | 418 +----------------- .../bulk/BulkShardOperationsResponse.java | 27 ++ .../TransportBulkShardOperationsAction.java | 3 +- .../xpack/ccr/rest/RestFollowIndexAction.java | 18 +- .../xpack/ccr/ShardChangesIT.java | 242 ++++++++-- .../ccr/action/ChunksCoordinatorTests.java | 407 ----------------- .../ccr/action/FollowIndexRequestTests.java | 8 +- .../ccr/action/ShardChangesActionTests.java | 8 +- .../ccr/action/ShardChangesRequestTests.java | 3 +- .../ccr/action/ShardChangesResponseTests.java | 3 +- .../ShardFollowNodeTaskStatusTests.java | 3 +- .../ccr/action/ShardFollowNodeTaskTests.java | 232 ++++++++++ .../ccr/action/ShardFollowTaskTests.java | 15 +- 17 files changed, 1108 insertions(+), 961 deletions(-) delete mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java create mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java index 21c56b53f2e62..499d93023e459 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java @@ -71,9 +71,12 @@ public static class Request extends ActionRequest { private String leaderIndex; private String followIndex; - private long batchSize = ShardFollowTasksExecutor.DEFAULT_BATCH_SIZE; - private int concurrentProcessors = ShardFollowTasksExecutor.DEFAULT_CONCURRENT_PROCESSORS; - private long processorMaxTranslogBytes = ShardFollowTasksExecutor.DEFAULT_MAX_TRANSLOG_BYTES; + private int maxReadSize = ShardFollowNodeTask.DEFAULT_MAX_READ_SIZE; + private int maxConcurrentReads = ShardFollowNodeTask.DEFAULT_MAX_CONCURRENT_READS; + private long processorMaxTranslogBytes = ShardFollowNodeTask.DEFAULT_MAX_TRANSLOG_BYTES; + private int maxWriteSize = ShardFollowNodeTask.DEFAULT_MAX_WRITE_SIZE; + private int maxConcurrentWrites = ShardFollowNodeTask.DEFAULT_MAX_CONCURRENT_WRITES; + private int maxBufferSize = ShardFollowNodeTask.DEFAULT_MAX_BUFFER_SIZE; public String getLeaderIndex() { return leaderIndex; @@ -91,23 +94,23 @@ public void setFollowIndex(String followIndex) { this.followIndex = followIndex; } - public long getBatchSize() { - return batchSize; + public int getMaxReadSize() { + return maxReadSize; } - public void setBatchSize(long batchSize) { - if (batchSize < 1) { - throw new IllegalArgumentException("Illegal batch_size [" + batchSize + "]"); + public void setMaxReadSize(int maxReadSize) { + if (maxReadSize < 1) { + throw new IllegalArgumentException("Illegal batch_size [" + maxReadSize + "]"); } - this.batchSize = batchSize; + this.maxReadSize = maxReadSize; } - public void setConcurrentProcessors(int concurrentProcessors) { - if (concurrentProcessors < 1) { + public void setMaxConcurrentReads(int maxConcurrentReads) { + if (maxConcurrentReads < 1) { throw new IllegalArgumentException("concurrent_processors must be larger than 0"); } - this.concurrentProcessors = concurrentProcessors; + this.maxConcurrentReads = maxConcurrentReads; } public void setProcessorMaxTranslogBytes(long processorMaxTranslogBytes) { @@ -117,6 +120,39 @@ public void setProcessorMaxTranslogBytes(long processorMaxTranslogBytes) { this.processorMaxTranslogBytes = processorMaxTranslogBytes; } + public int getMaxWriteSize() { + return maxWriteSize; + } + + public void setMaxWriteSize(int maxWriteSize) { + if (maxWriteSize < 1) { + throw new IllegalArgumentException("maxWriteSize must be larger than 0"); + } + this.maxWriteSize = maxWriteSize; + } + + public int getMaxConcurrentWrites() { + return maxConcurrentWrites; + } + + public void setMaxConcurrentWrites(int maxConcurrentWrites) { + if (maxConcurrentWrites < 1) { + throw new IllegalArgumentException("maxConcurrentWrites must be larger than 0"); + } + this.maxConcurrentWrites = maxConcurrentWrites; + } + + public int getMaxBufferSize() { + return maxBufferSize; + } + + public void setMaxBufferSize(int maxBufferSize) { + if (maxBufferSize < 1) { + throw new IllegalArgumentException("maxBufferSize must be larger than 0"); + } + this.maxBufferSize = maxBufferSize; + } + @Override public ActionRequestValidationException validate() { return null; @@ -127,9 +163,12 @@ public void readFrom(StreamInput in) throws IOException { super.readFrom(in); leaderIndex = in.readString(); followIndex = in.readString(); - batchSize = in.readVLong(); - concurrentProcessors = in.readVInt(); + maxReadSize = in.readVInt(); + maxConcurrentReads = in.readVInt(); processorMaxTranslogBytes = in.readVLong(); + maxWriteSize = in.readVInt(); + maxConcurrentWrites = in.readVInt(); + maxBufferSize = in.readVInt(); } @Override @@ -137,9 +176,12 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(leaderIndex); out.writeString(followIndex); - out.writeVLong(batchSize); - out.writeVInt(concurrentProcessors); + out.writeVInt(maxReadSize); + out.writeVInt(maxConcurrentReads); out.writeVLong(processorMaxTranslogBytes); + out.writeVInt(maxWriteSize); + out.writeVInt(maxConcurrentWrites); + out.writeVInt(maxBufferSize); } @Override @@ -147,16 +189,20 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Request request = (Request) o; - return batchSize == request.batchSize && - concurrentProcessors == request.concurrentProcessors && + return maxReadSize == request.maxReadSize && + maxConcurrentReads == request.maxConcurrentReads && processorMaxTranslogBytes == request.processorMaxTranslogBytes && + maxWriteSize == request.maxWriteSize && + maxConcurrentWrites == request.maxConcurrentWrites && + maxBufferSize == request.maxBufferSize && Objects.equals(leaderIndex, request.leaderIndex) && Objects.equals(followIndex, request.followIndex); } @Override public int hashCode() { - return Objects.hash(leaderIndex, followIndex, batchSize, concurrentProcessors, processorMaxTranslogBytes); + return Objects.hash(leaderIndex, followIndex, maxReadSize, maxConcurrentReads, processorMaxTranslogBytes, + maxWriteSize, maxConcurrentWrites, maxBufferSize); } } @@ -254,7 +300,8 @@ void start(Request request, String clusterNameAlias, IndexMetaData leaderIndexMe ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias, new ShardId(followIndexMetadata.getIndex(), shardId), new ShardId(leaderIndexMetadata.getIndex(), shardId), - request.batchSize, request.concurrentProcessors, request.processorMaxTranslogBytes, filteredHeaders); + request.maxReadSize, request.maxConcurrentReads, request.processorMaxTranslogBytes, + request.maxWriteSize, request.maxConcurrentWrites, request.maxBufferSize, filteredHeaders); persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask, new ActionListener>() { @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index c72b13a701f41..b746f8dcd4225 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -54,9 +54,9 @@ public Response newResponse() { public static class Request extends SingleShardRequest { private long minSeqNo; - private long maxSeqNo; + private Long maxSeqNo; private ShardId shardId; - private long maxTranslogsBytes = ShardFollowTasksExecutor.DEFAULT_MAX_TRANSLOG_BYTES; + private long maxTranslogsBytes = ShardFollowNodeTask.DEFAULT_MAX_TRANSLOG_BYTES; public Request(ShardId shardId) { super(shardId.getIndexName()); @@ -78,11 +78,11 @@ public void setMinSeqNo(long minSeqNo) { this.minSeqNo = minSeqNo; } - public long getMaxSeqNo() { + public Long getMaxSeqNo() { return maxSeqNo; } - public void setMaxSeqNo(long maxSeqNo) { + public void setMaxSeqNo(Long maxSeqNo) { this.maxSeqNo = maxSeqNo; } @@ -100,7 +100,7 @@ public ActionRequestValidationException validate() { if (minSeqNo < 0) { validationException = addValidationError("minSeqNo [" + minSeqNo + "] cannot be lower than 0", validationException); } - if (maxSeqNo < minSeqNo) { + if (maxSeqNo != null && maxSeqNo < minSeqNo) { validationException = addValidationError("minSeqNo [" + minSeqNo + "] cannot be larger than maxSeqNo [" + maxSeqNo + "]", validationException); } @@ -115,7 +115,7 @@ public ActionRequestValidationException validate() { public void readFrom(StreamInput in) throws IOException { super.readFrom(in); minSeqNo = in.readVLong(); - maxSeqNo = in.readVLong(); + maxSeqNo = in.readOptionalLong(); shardId = ShardId.readShardId(in); maxTranslogsBytes = in.readVLong(); } @@ -124,7 +124,7 @@ public void readFrom(StreamInput in) throws IOException { public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeVLong(minSeqNo); - out.writeVLong(maxSeqNo); + out.writeOptionalLong(maxSeqNo); shardId.writeTo(out); out.writeVLong(maxTranslogsBytes); } @@ -136,7 +136,7 @@ public boolean equals(final Object o) { if (o == null || getClass() != o.getClass()) return false; final Request request = (Request) o; return minSeqNo == request.minSeqNo && - maxSeqNo == request.maxSeqNo && + Objects.equals(maxSeqNo, request.maxSeqNo) && Objects.equals(shardId, request.shardId) && maxTranslogsBytes == request.maxTranslogsBytes; } @@ -150,13 +150,15 @@ public int hashCode() { public static final class Response extends ActionResponse { private long indexMetadataVersion; + private long leaderGlobalCheckpoint; private Translog.Operation[] operations; Response() { } - Response(long indexMetadataVersion, final Translog.Operation[] operations) { + Response(long indexMetadataVersion, long leaderGlobalCheckpoint, final Translog.Operation[] operations) { this.indexMetadataVersion = indexMetadataVersion; + this.leaderGlobalCheckpoint = leaderGlobalCheckpoint; this.operations = operations; } @@ -164,6 +166,10 @@ public long getIndexMetadataVersion() { return indexMetadataVersion; } + public long getLeaderGlobalCheckpoint() { + return leaderGlobalCheckpoint; + } + public Translog.Operation[] getOperations() { return operations; } @@ -172,6 +178,7 @@ public Translog.Operation[] getOperations() { public void readFrom(final StreamInput in) throws IOException { super.readFrom(in); indexMetadataVersion = in.readVLong(); + leaderGlobalCheckpoint = in.readZLong(); operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new); } @@ -179,6 +186,7 @@ public void readFrom(final StreamInput in) throws IOException { public void writeTo(final StreamOutput out) throws IOException { super.writeTo(out); out.writeVLong(indexMetadataVersion); + out.writeZLong(leaderGlobalCheckpoint); out.writeArray(Translog.Operation::writeOperation, operations); } @@ -188,13 +196,15 @@ public boolean equals(final Object o) { if (o == null || getClass() != o.getClass()) return false; final Response response = (Response) o; return indexMetadataVersion == response.indexMetadataVersion && - Arrays.equals(operations, response.operations); + leaderGlobalCheckpoint == response.leaderGlobalCheckpoint && + Arrays.equals(operations, response.operations); } @Override public int hashCode() { int result = 1; result += Objects.hashCode(indexMetadataVersion); + result += Objects.hashCode(leaderGlobalCheckpoint); result += Arrays.hashCode(operations); return result; } @@ -222,14 +232,16 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex()); IndexShard indexShard = indexService.getShard(request.getShard().id()); final long indexMetaDataVersion = clusterService.state().metaData().index(shardId.getIndex()).getVersion(); + // The following shard generates this request based on the global checkpoint on the primary copy on the leader. // Although this value might not have been synced to all replica copies on the leader, the requesting range // is guaranteed to be at most the local-checkpoint of any shard copies on the leader. - assert request.maxSeqNo <= indexShard.getLocalCheckpoint() : "invalid request from_seqno=[" + request.minSeqNo + "]," + - " to_seqno=[" + request.maxSeqNo + "], local_checkpoint=[" + indexShard.getLocalCheckpoint() + "]"; + assert request.maxSeqNo == null || request.maxSeqNo <= indexShard.getLocalCheckpoint() : "invalid request from_seqno=[" + + request.minSeqNo + "]," + " to_seqno=[" + request.maxSeqNo + "], local_checkpoint=[" + + indexShard.getLocalCheckpoint() + "]"; final Translog.Operation[] operations = getOperationsBetween(indexShard, request.minSeqNo, request.maxSeqNo, request.maxTranslogsBytes); - return new Response(indexMetaDataVersion, operations); + return new Response(indexMetaDataVersion, indexShard.getGlobalCheckpoint(), operations); } @Override @@ -254,14 +266,15 @@ protected Response newResponse() { private static final Translog.Operation[] EMPTY_OPERATIONS_ARRAY = new Translog.Operation[0]; - static Translog.Operation[] getOperationsBetween(IndexShard indexShard, long minSeqNo, long maxSeqNo, + static Translog.Operation[] getOperationsBetween(IndexShard indexShard, long minSeqNo, Long maxSeqNo, long byteLimit) throws IOException { if (indexShard.state() != IndexShardState.STARTED) { throw new IndexShardNotStartedException(indexShard.shardId(), indexShard.state()); } int seenBytes = 0; final List operations = new ArrayList<>(); - try (Translog.Snapshot snapshot = indexShard.newLuceneChangesSnapshot("ccr", minSeqNo, maxSeqNo, true)) { + long max = maxSeqNo != null ? maxSeqNo : minSeqNo + 1000; + try (Translog.Snapshot snapshot = indexShard.newLuceneChangesSnapshot("ccr", minSeqNo, max, true)) { Translog.Operation op; while ((op = snapshot.next()) != null) { if (op.getSource() == null) { @@ -274,6 +287,15 @@ static Translog.Operation[] getOperationsBetween(IndexShard indexShard, long min break; } } + } catch (IllegalStateException e) { + // TODO: handle peek reads better. + // Should this optional upper bound leak into the newLuceneChangesSnapshot(...) method? + if (maxSeqNo != null) { + throw e; + } else if (e.getMessage().contains("prematurely terminated last_seen_seqno") == false) { + // Only fail if there are gaps between the ops. + throw e; + } } return operations.toArray(EMPTY_OPERATIONS_ARRAY); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 8777fd0eabe95..5fcfd44d27fd2 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -5,28 +5,360 @@ */ package org.elasticsearch.xpack.ccr.action; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.NoShardAvailableActionException; +import org.elasticsearch.action.UnavailableShardsException; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.transport.NetworkExceptionHelper; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.transport.ActionTransportException; +import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction; +import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest; +import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; import java.io.IOException; +import java.util.Arrays; +import java.util.LinkedList; import java.util.Map; import java.util.Objects; -import java.util.concurrent.atomic.AtomicLong; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.LongConsumer; +/** + * The node task that fetch the write operations from a leader shard and + * persists these ops in the follower shard. + */ public class ShardFollowNodeTask extends AllocatedPersistentTask { - private final AtomicLong processedGlobalCheckpoint = new AtomicLong(); + static final int DEFAULT_MAX_READ_SIZE = 1024; + static final int DEFAULT_MAX_WRITE_SIZE = 1024; + static final int RETRY_LIMIT = 10; + static final int DEFAULT_MAX_CONCURRENT_READS = 1; + static final int DEFAULT_MAX_CONCURRENT_WRITES = 1; + static final int DEFAULT_MAX_BUFFER_SIZE = 10240; + static final long DEFAULT_MAX_TRANSLOG_BYTES= Long.MAX_VALUE; + private static final TimeValue RETRY_TIMEOUT = TimeValue.timeValueMillis(500); + + private static final Logger LOGGER = Loggers.getLogger(ShardFollowNodeTask.class); + + final Client leaderClient; + final Client followerClient; + private final ShardFollowTask params; + private final BiConsumer scheduler; + + private volatile long lastRequestedSeqno; + private volatile long leaderGlobalCheckpoint; + + private volatile int numConcurrentReads = 0; + private volatile int numConcurrentWrites = 0; + private volatile long processedGlobalCheckpoint = 0; + private volatile long currentIndexMetadataVersion = 0; + private final AtomicInteger retryCounter = new AtomicInteger(0); + private final Queue buffer = new LinkedList<>(); - ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map headers) { + ShardFollowNodeTask(long id, + String type, + String action, + String description, + TaskId parentTask, + Map headers, + Client leaderClient, + Client followerClient, + ShardFollowTask params, + BiConsumer scheduler) { super(id, type, action, description, parentTask, headers); + this.leaderClient = leaderClient; + this.followerClient = followerClient; + this.params = params; + this.scheduler = scheduler; + } + + void start(long leaderGlobalCheckpoint, long followGlobalCheckpoint) { + this.lastRequestedSeqno = followGlobalCheckpoint; + this.processedGlobalCheckpoint = followGlobalCheckpoint; + this.leaderGlobalCheckpoint = leaderGlobalCheckpoint; + + // Forcefully updates follower mapping, this gets us the leader imd version and + // makes sure that leader and follower mapping are identical. + updateMapping(imdVersion -> { + currentIndexMetadataVersion = imdVersion; + LOGGER.info("{} Started to follow leader shard {}, leaderGlobalCheckPoint={}, followGlobalCheckPoint={}", + params.getFollowShardId(), params.getLeaderShardId(), leaderGlobalCheckpoint, followGlobalCheckpoint); + coordinateReads(); + }); + } + + private synchronized void coordinateReads() { + if (isStopped()) { + LOGGER.info("{} shard follow task has been stopped", params.getFollowShardId()); + return; + } + + LOGGER.trace("{} coordinate reads, lastRequestedSeqno={}, leaderGlobalCheckpoint={}", + params.getFollowShardId(), lastRequestedSeqno, leaderGlobalCheckpoint); + final long maxReadSize = params.getMaxReadSize(); + final long maxConcurrentReads = params.getMaxConcurrentReads(); + if (lastRequestedSeqno < leaderGlobalCheckpoint) { + while (true) { + if (lastRequestedSeqno >= leaderGlobalCheckpoint) { + LOGGER.debug("{} no new reads to coordinate lastRequestedSeqno [{}] leaderGlobalCheckpoint [{}]", + params.getLeaderShardId(), lastRequestedSeqno, leaderGlobalCheckpoint); + break; + } + if (numConcurrentReads >= maxConcurrentReads) { + LOGGER.debug("{} no new reads, maximum number of concurrent reads have been reached [{}]", + params.getFollowShardId(), numConcurrentReads); + break; + } + if (buffer.size() > params.getMaxBufferSize()) { + LOGGER.debug("{} no new reads, buffer limit has been reached [{}]", params.getFollowShardId(), buffer.size()); + break; + } + numConcurrentReads++; + long from = lastRequestedSeqno + 1; + long to = from + maxReadSize <= leaderGlobalCheckpoint ? from + maxReadSize : leaderGlobalCheckpoint; + LOGGER.debug("{}[{}] read [{}/{}]", params.getFollowShardId(), numConcurrentReads, from, to); + sendShardChangesRequest(from, to); + lastRequestedSeqno = to; + } + if (numConcurrentReads == 0) { + LOGGER.debug("{} re-scheduling coordinate reads phase", params.getFollowShardId()); + scheduler.accept(TimeValue.timeValueMillis(500), this::coordinateReads); + } + } else { + if (numConcurrentReads == 0) { + LOGGER.debug("{} scheduling peek read", params.getFollowShardId()); + scheduler.accept(TimeValue.timeValueMillis(500), () -> { + synchronized (this) { + // We sneak peek if there is any thing new in the leader primary. + // If there is we will happily accept + numConcurrentReads++; + long from = lastRequestedSeqno + 1; + LOGGER.debug("{}[{}] peek read [{}]", params.getFollowShardId(), numConcurrentReads, from); + sendShardChangesRequest(from, null); + } + }); + } + } + } + + private synchronized void coordinateWrites() { + while (true) { + if (buffer.isEmpty()) { + LOGGER.debug("{} no writes to coordinate, because buffer is empty", params.getFollowShardId()); + break; + } + if (numConcurrentWrites >= params.getMaxConcurrentWrites()) { + LOGGER.debug("{} maximum number of concurrent writes have been reached [{}]", + params.getFollowShardId(), numConcurrentWrites); + break; + } + Translog.Operation[] ops = new Translog.Operation[Math.min(params.getMaxWriteSize(), buffer.size())]; + for (int i = 0; i < ops.length; i++) { + ops[i] = buffer.remove(); + } + numConcurrentWrites++; + LOGGER.debug("{}[{}] write [{}/{}] [{}]", params.getFollowShardId(), numConcurrentWrites, ops[0].seqNo(), + ops[ops.length - 1].seqNo(), ops.length); + sendBulkShardOperationsRequest(ops); + } + } + + private void sendShardChangesRequest(long from, Long to) { + innerSendShardChangesRequest(from, to, + response -> { + retryCounter.set(0); + handleResponse(from, to, response); + }, + e -> handleFailure(e, () -> sendShardChangesRequest(from, to))); + } + + private synchronized void handleResponse(long from, Long to, ShardChangesAction.Response response) { + maybeUpdateMapping(response.getIndexMetadataVersion(), () -> { + synchronized (ShardFollowNodeTask.this) { + leaderGlobalCheckpoint = Math.max(leaderGlobalCheckpoint, response.getLeaderGlobalCheckpoint()); + if (response.getOperations().length == 0) { + numConcurrentReads--; + if (numConcurrentWrites == 0) { + coordinateWrites(); + } + coordinateReads(); + } else { + Translog.Operation firstOp = response.getOperations()[0]; + assert firstOp.seqNo() == from; + Translog.Operation lastOp = response.getOperations()[response.getOperations().length - 1]; + + LOGGER.debug("{} received [{}/{}]", params.getFollowShardId(), firstOp.seqNo(), lastOp.seqNo()); + buffer.addAll(Arrays.asList(response.getOperations())); + if (to == null) { + lastRequestedSeqno = Math.max(lastRequestedSeqno, lastOp.seqNo()); + LOGGER.debug("{} post updating lastRequestedSeqno to [{}]", params.getFollowShardId(), lastRequestedSeqno); + numConcurrentReads--; + } else { + if (lastOp.seqNo() < to) { + long newFrom = lastOp.seqNo() + 1; + LOGGER.debug("{} received [{}] as last op while [{}] was expected, continue to read [{}/{}]...", + params.getFollowShardId(), lastOp.seqNo(), to, newFrom, to); + sendShardChangesRequest(newFrom, to); + } else { + numConcurrentReads--; + } + } + if (numConcurrentWrites == 0) { + coordinateWrites(); + } + coordinateReads(); + } + assert numConcurrentReads >= 0; + } + }); + } + + private void sendBulkShardOperationsRequest(Translog.Operation[] operations) { + innerSendBulkShardOperationsRequest(operations, + followerLocalCheckpoint -> { + retryCounter.set(0); + handleResponse(followerLocalCheckpoint); + }, + e -> handleFailure(e, () -> sendBulkShardOperationsRequest(operations)) + ); + } + + private synchronized void handleResponse(long followerLocalCheckpoint) { + processedGlobalCheckpoint = Math.max(processedGlobalCheckpoint, followerLocalCheckpoint); + numConcurrentWrites--; + assert numConcurrentWrites >= 0; + coordinateWrites(); + } + + private void maybeUpdateMapping(Long minimumRequiredIndexMetadataVersion, Runnable task) { + assert Thread.holdsLock(this); + if (currentIndexMetadataVersion >= minimumRequiredIndexMetadataVersion) { + LOGGER.trace("{} index metadata version [{}] is higher or equal than minimum required index metadata version [{}]", + params.getFollowShardId(), currentIndexMetadataVersion, minimumRequiredIndexMetadataVersion); + task.run(); + } else { + LOGGER.debug("{} updating mapping, index metadata version [{}] is lower than minimum required index metadata version [{}]", + params.getFollowShardId(), currentIndexMetadataVersion, minimumRequiredIndexMetadataVersion); + updateMapping(imdVersion -> { + retryCounter.set(0); + currentIndexMetadataVersion = imdVersion; + task.run(); + }); + } + } + + private void handleFailure(Exception e, Runnable task) { + assert e != null; + if (shouldRetry(e)) { + if (isStopped() == false && retryCounter.incrementAndGet() <= RETRY_LIMIT) { + LOGGER.warn("error during follow shard task, retrying...", e); + scheduler.accept(RETRY_TIMEOUT, task); + } else { + markAsFailed(new ElasticsearchException("retrying failed [" + retryCounter.get() + + "] times, aborting...", e)); + } + } else { + markAsFailed(e); + } + } + + private boolean shouldRetry(Exception e) { + // TODO: What other exceptions should be retried? + return NetworkExceptionHelper.isConnectException(e) || + NetworkExceptionHelper.isCloseConnectionException(e) || + e instanceof ActionTransportException || + e instanceof NodeClosedException || + e instanceof UnavailableShardsException || + e instanceof NoShardAvailableActionException; + } + + // These methods are protected for testing purposes: + protected void updateMapping(LongConsumer handler) { + Index leaderIndex = params.getLeaderShardId().getIndex(); + Index followIndex = params.getFollowShardId().getIndex(); + + ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.clear(); + clusterStateRequest.metaData(true); + clusterStateRequest.indices(leaderIndex.getName()); + + leaderClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> { + IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex); + assert indexMetaData.getMappings().size() == 1; + MappingMetaData mappingMetaData = indexMetaData.getMappings().iterator().next().value; + + PutMappingRequest putMappingRequest = new PutMappingRequest(followIndex.getName()); + putMappingRequest.type(mappingMetaData.type()); + putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON); + followerClient.admin().indices().putMapping(putMappingRequest, ActionListener.wrap( + putMappingResponse -> handler.accept(indexMetaData.getVersion()), + e -> handleFailure(e, () -> updateMapping(handler)))); + }, e -> handleFailure(e, () -> updateMapping(handler)))); + } + + protected void innerSendBulkShardOperationsRequest(Translog.Operation[] operations, + LongConsumer handler, + Consumer errorHandler) { + final BulkShardOperationsRequest request = new BulkShardOperationsRequest(params.getFollowShardId(), operations); + followerClient.execute(BulkShardOperationsAction.INSTANCE, request, + new ActionListener() { + @Override + public void onResponse(BulkShardOperationsResponse response) { + handler.accept(response.getLocalCheckpoint()); + } + + @Override + public void onFailure(Exception e) { + errorHandler.accept(e); + } + } + ); + } + + protected void innerSendShardChangesRequest(long from, + Long to, + Consumer handler, + Consumer errorHandler) { + ShardChangesAction.Request request = new ShardChangesAction.Request(params.getLeaderShardId()); + request.setMinSeqNo(from); + request.setMaxSeqNo(to); + request.setMaxTranslogsBytes(params.getMaxTranslogBytes()); + leaderClient.execute(ShardChangesAction.INSTANCE, request, new ActionListener() { + @Override + public void onResponse(ShardChangesAction.Response response) { + handler.accept(response); + } + + @Override + public void onFailure(Exception e) { + errorHandler.accept(e); + } + }); } @Override @@ -34,17 +366,13 @@ protected void onCancelled() { markAsCompleted(); } - public boolean isRunning() { - return isCancelled() == false && isCompleted() == false; - } - - void updateProcessedGlobalCheckpoint(long processedGlobalCheckpoint) { - this.processedGlobalCheckpoint.set(processedGlobalCheckpoint); + protected boolean isStopped() { + return isCancelled() || isCompleted(); } @Override - public Task.Status getStatus() { - return new Status(processedGlobalCheckpoint.get()); + public Status getStatus() { + return new Status(processedGlobalCheckpoint, numConcurrentReads, numConcurrentWrites); } public static class Status implements Task.Status { @@ -52,28 +380,46 @@ public static class Status implements Task.Status { public static final String NAME = "shard-follow-node-task-status"; static final ParseField PROCESSED_GLOBAL_CHECKPOINT_FIELD = new ParseField("processed_global_checkpoint"); + static final ParseField NUMBER_OF_CONCURRENT_READS_FIELD = new ParseField("number_of_concurrent_reads"); + static final ParseField NUMBER_OF_CONCURRENT_WRITES_FIELD = new ParseField("number_of_concurrent_writes"); static final ConstructingObjectParser PARSER = - new ConstructingObjectParser<>(NAME, args -> new Status((Long) args[0])); + new ConstructingObjectParser<>(NAME, args -> new Status((long) args[0], (int) args[1], (int) args[2])); static { PARSER.declareLong(ConstructingObjectParser.constructorArg(), PROCESSED_GLOBAL_CHECKPOINT_FIELD); + PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_READS_FIELD); + PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_WRITES_FIELD); } private final long processedGlobalCheckpoint; + private final int numberOfConcurrentReads; + private final int numberOfConcurrentWrites; - Status(long processedGlobalCheckpoint) { + Status(long processedGlobalCheckpoint, int numberOfConcurrentReads, int numberOfConcurrentWrites) { this.processedGlobalCheckpoint = processedGlobalCheckpoint; + this.numberOfConcurrentReads = numberOfConcurrentReads; + this.numberOfConcurrentWrites = numberOfConcurrentWrites; } public Status(StreamInput in) throws IOException { this.processedGlobalCheckpoint = in.readZLong(); + this.numberOfConcurrentReads = in.readVInt(); + this.numberOfConcurrentWrites = in.readVInt(); } public long getProcessedGlobalCheckpoint() { return processedGlobalCheckpoint; } + public int getNumberOfConcurrentReads() { + return numberOfConcurrentReads; + } + + public int getNumberOfConcurrentWrites() { + return numberOfConcurrentWrites; + } + @Override public String getWriteableName() { return NAME; @@ -82,6 +428,8 @@ public String getWriteableName() { @Override public void writeTo(StreamOutput out) throws IOException { out.writeZLong(processedGlobalCheckpoint); + out.writeVInt(numberOfConcurrentReads); + out.writeVInt(numberOfConcurrentWrites); } @Override @@ -90,6 +438,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws { builder.field(PROCESSED_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), processedGlobalCheckpoint); } + { + builder.field(NUMBER_OF_CONCURRENT_READS_FIELD.getPreferredName(), numberOfConcurrentReads); + } + { + builder.field(NUMBER_OF_CONCURRENT_WRITES_FIELD.getPreferredName(), numberOfConcurrentWrites); + } builder.endObject(); return builder; } @@ -103,12 +457,14 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Status status = (Status) o; - return processedGlobalCheckpoint == status.processedGlobalCheckpoint; + return processedGlobalCheckpoint == status.processedGlobalCheckpoint && + numberOfConcurrentReads == status.numberOfConcurrentReads && + numberOfConcurrentWrites == status.numberOfConcurrentWrites; } @Override public int hashCode() { - return Objects.hash(processedGlobalCheckpoint); + return Objects.hash(processedGlobalCheckpoint, numberOfConcurrentReads, numberOfConcurrentWrites); } public String toString() { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java index e97f8e7dc0c3c..a92172b390708 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java @@ -40,15 +40,18 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { static final ParseField LEADER_SHARD_INDEX_UUID_FIELD = new ParseField("leader_shard_index_uuid"); static final ParseField LEADER_SHARD_SHARDID_FIELD = new ParseField("leader_shard_shard"); static final ParseField HEADERS = new ParseField("headers"); - public static final ParseField MAX_CHUNK_SIZE = new ParseField("max_chunk_size"); - public static final ParseField NUM_CONCURRENT_CHUNKS = new ParseField("max_concurrent_chunks"); - public static final ParseField PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST = new ParseField("processor_max_translog_bytes"); + public static final ParseField MAX_READ_SIZE = new ParseField("max_read_size"); + public static final ParseField MAX_CONCURRENT_READS = new ParseField("max_concurrent_reads"); + public static final ParseField MAX_TRANSLOG_BYTES_PER_REQUEST = new ParseField("max_translog_bytes"); + public static final ParseField MAX_WRITE_SIZE = new ParseField("max_write_size"); + public static final ParseField MAX_CONCURRENT_WRITES = new ParseField("max_concurrent_writes"); + public static final ParseField MAX_BUFFER_SIZE = new ParseField("max_buffer_size"); @SuppressWarnings("unchecked") - public static ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, + private static ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, (a) -> new ShardFollowTask((String) a[0], new ShardId((String) a[1], (String) a[2], (int) a[3]), - new ShardId((String) a[4], (String) a[5], (int) a[6]), (long) a[7], (int) a[8], (long) a[9], - (Map) a[10])); + new ShardId((String) a[4], (String) a[5], (int) a[6]), (int) a[7], (int) a[8], (long) a[9], + (int) a[10], (int) a[11], (int) a[12], (Map) a[13])); static { PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), LEADER_CLUSTER_ALIAS_FIELD); @@ -58,28 +61,38 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_SHARD_INDEX_FIELD); PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_SHARD_INDEX_UUID_FIELD); PARSER.declareInt(ConstructingObjectParser.constructorArg(), LEADER_SHARD_SHARDID_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), MAX_CHUNK_SIZE); - PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUM_CONCURRENT_CHUNKS); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST); + PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_READ_SIZE); + PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_CONCURRENT_READS); + PARSER.declareLong(ConstructingObjectParser.constructorArg(), MAX_TRANSLOG_BYTES_PER_REQUEST); + PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_WRITE_SIZE); + PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_CONCURRENT_WRITES); + PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_BUFFER_SIZE); PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.mapStrings(), HEADERS); } private final String leaderClusterAlias; private final ShardId followShardId; private final ShardId leaderShardId; - private final long maxChunkSize; - private final int numConcurrentChunks; - private final long processorMaxTranslogBytes; + private final int maxReadSize; + private final int maxConcurrentReads; + private final long maxTranslogBytes; + private final int maxWriteSize; + private final int maxConcurrentWrites; + private final int maxBufferSize; private final Map headers; - ShardFollowTask(String leaderClusterAlias, ShardId followShardId, ShardId leaderShardId, long maxChunkSize, - int numConcurrentChunks, long processorMaxTranslogBytes, Map headers) { + ShardFollowTask(String leaderClusterAlias, ShardId followShardId, ShardId leaderShardId, int maxReadSize, + int maxConcurrentReads, long maxTranslogBytes, int maxWriteSize, int maxConcurrentWrites, + int maxBufferSize, Map headers) { this.leaderClusterAlias = leaderClusterAlias; this.followShardId = followShardId; this.leaderShardId = leaderShardId; - this.maxChunkSize = maxChunkSize; - this.numConcurrentChunks = numConcurrentChunks; - this.processorMaxTranslogBytes = processorMaxTranslogBytes; + this.maxReadSize = maxReadSize; + this.maxConcurrentReads = maxConcurrentReads; + this.maxTranslogBytes = maxTranslogBytes; + this.maxWriteSize = maxWriteSize; + this.maxConcurrentWrites = maxConcurrentWrites; + this.maxBufferSize = maxBufferSize; this.headers = headers != null ? Collections.unmodifiableMap(headers) : Collections.emptyMap(); } @@ -87,9 +100,12 @@ public ShardFollowTask(StreamInput in) throws IOException { this.leaderClusterAlias = in.readOptionalString(); this.followShardId = ShardId.readShardId(in); this.leaderShardId = ShardId.readShardId(in); - this.maxChunkSize = in.readVLong(); - this.numConcurrentChunks = in.readVInt(); - this.processorMaxTranslogBytes = in.readVLong(); + this.maxReadSize = in.readVInt(); + this.maxConcurrentReads = in.readVInt(); + this.maxTranslogBytes = in.readVLong(); + this.maxWriteSize = in.readVInt(); + this.maxConcurrentWrites= in.readVInt(); + this.maxBufferSize = in.readVInt(); this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString)); } @@ -105,16 +121,28 @@ public ShardId getLeaderShardId() { return leaderShardId; } - public long getMaxChunkSize() { - return maxChunkSize; + public int getMaxReadSize() { + return maxReadSize; } - public int getNumConcurrentChunks() { - return numConcurrentChunks; + public int getMaxWriteSize() { + return maxWriteSize; } - public long getProcessorMaxTranslogBytes() { - return processorMaxTranslogBytes; + public int getMaxConcurrentReads() { + return maxConcurrentReads; + } + + public int getMaxConcurrentWrites() { + return maxConcurrentWrites; + } + + public int getMaxBufferSize() { + return maxBufferSize; + } + + public long getMaxTranslogBytes() { + return maxTranslogBytes; } public Map getHeaders() { @@ -131,9 +159,12 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalString(leaderClusterAlias); followShardId.writeTo(out); leaderShardId.writeTo(out); - out.writeVLong(maxChunkSize); - out.writeVInt(numConcurrentChunks); - out.writeVLong(processorMaxTranslogBytes); + out.writeVLong(maxReadSize); + out.writeVInt(maxConcurrentReads); + out.writeVLong(maxTranslogBytes); + out.writeVInt(maxWriteSize); + out.writeVInt(maxConcurrentWrites); + out.writeVInt(maxBufferSize); out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString); } @@ -144,19 +175,50 @@ public static ShardFollowTask fromXContent(XContentParser parser) { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - if (leaderClusterAlias != null) { - builder.field(LEADER_CLUSTER_ALIAS_FIELD.getPreferredName(), leaderClusterAlias); + { + if (leaderClusterAlias != null) { + builder.field(LEADER_CLUSTER_ALIAS_FIELD.getPreferredName(), leaderClusterAlias); + } + } + { + builder.field(FOLLOW_SHARD_INDEX_FIELD.getPreferredName(), followShardId.getIndex().getName()); + } + { + builder.field(FOLLOW_SHARD_INDEX_UUID_FIELD.getPreferredName(), followShardId.getIndex().getUUID()); + } + { + builder.field(FOLLOW_SHARD_SHARDID_FIELD.getPreferredName(), followShardId.id()); + } + { + builder.field(LEADER_SHARD_INDEX_FIELD.getPreferredName(), leaderShardId.getIndex().getName()); + } + { + builder.field(LEADER_SHARD_INDEX_UUID_FIELD.getPreferredName(), leaderShardId.getIndex().getUUID()); + } + { + builder.field(LEADER_SHARD_SHARDID_FIELD.getPreferredName(), leaderShardId.id()); + } + { + builder.field(MAX_READ_SIZE.getPreferredName(), maxReadSize); + } + { + builder.field(MAX_CONCURRENT_READS.getPreferredName(), maxConcurrentReads); + } + { + builder.field(MAX_TRANSLOG_BYTES_PER_REQUEST.getPreferredName(), maxTranslogBytes); + } + { + builder.field(MAX_WRITE_SIZE.getPreferredName(), maxWriteSize); + } + { + builder.field(MAX_CONCURRENT_WRITES.getPreferredName(), maxConcurrentWrites); + } + { + builder.field(MAX_BUFFER_SIZE.getPreferredName(), maxBufferSize); + } + { + builder.field(HEADERS.getPreferredName(), headers); } - builder.field(FOLLOW_SHARD_INDEX_FIELD.getPreferredName(), followShardId.getIndex().getName()); - builder.field(FOLLOW_SHARD_INDEX_UUID_FIELD.getPreferredName(), followShardId.getIndex().getUUID()); - builder.field(FOLLOW_SHARD_SHARDID_FIELD.getPreferredName(), followShardId.id()); - builder.field(LEADER_SHARD_INDEX_FIELD.getPreferredName(), leaderShardId.getIndex().getName()); - builder.field(LEADER_SHARD_INDEX_UUID_FIELD.getPreferredName(), leaderShardId.getIndex().getUUID()); - builder.field(LEADER_SHARD_SHARDID_FIELD.getPreferredName(), leaderShardId.id()); - builder.field(MAX_CHUNK_SIZE.getPreferredName(), maxChunkSize); - builder.field(NUM_CONCURRENT_CHUNKS.getPreferredName(), numConcurrentChunks); - builder.field(PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST.getPreferredName(), processorMaxTranslogBytes); - builder.field(HEADERS.getPreferredName(), headers); return builder.endObject(); } @@ -168,16 +230,19 @@ public boolean equals(Object o) { return Objects.equals(leaderClusterAlias, that.leaderClusterAlias) && Objects.equals(followShardId, that.followShardId) && Objects.equals(leaderShardId, that.leaderShardId) && - maxChunkSize == that.maxChunkSize && - numConcurrentChunks == that.numConcurrentChunks && - processorMaxTranslogBytes == that.processorMaxTranslogBytes && + maxReadSize == that.maxReadSize && + maxConcurrentReads == that.maxConcurrentReads && + maxWriteSize == that.maxWriteSize && + maxConcurrentWrites == that.maxConcurrentWrites && + maxTranslogBytes == that.maxTranslogBytes && + maxBufferSize == that.maxBufferSize && Objects.equals(headers, that.headers); } @Override public int hashCode() { - return Objects.hash(leaderClusterAlias, followShardId, leaderShardId, maxChunkSize, numConcurrentChunks, - processorMaxTranslogBytes, headers); + return Objects.hash(leaderClusterAlias, followShardId, leaderShardId, maxReadSize, maxConcurrentReads, + maxWriteSize, maxConcurrentWrites, maxTranslogBytes, maxBufferSize, headers); } public String toString() { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 895f53846f65a..16d5cfcb69ccc 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -5,13 +5,8 @@ */ package org.elasticsearch.xpack.ccr.action; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; -import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.admin.indices.stats.IndexStats; @@ -21,20 +16,11 @@ import org.elasticsearch.client.Client; import org.elasticsearch.client.FilterClient; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.IndexRoutingTable; -import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.NetworkExceptionHelper; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.common.util.concurrent.CountDown; -import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.index.Index; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.translog.Translog; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; @@ -42,20 +28,10 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ccr.Ccr; -import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction; -import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest; -import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; import java.util.Arrays; import java.util.Map; import java.util.Optional; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.LongConsumer; @@ -64,12 +40,6 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor { - static final long DEFAULT_BATCH_SIZE = 1024; - static final int PROCESSOR_RETRY_LIMIT = 16; - static final int DEFAULT_CONCURRENT_PROCESSORS = 1; - static final long DEFAULT_MAX_TRANSLOG_BYTES= Long.MAX_VALUE; - private static final TimeValue RETRY_TIMEOUT = TimeValue.timeValueMillis(500); - private final Client client; private final ThreadPool threadPool; @@ -100,86 +70,34 @@ public void validate(ShardFollowTask params, ClusterState clusterState) { protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId, PersistentTasksCustomMetaData.PersistentTask taskInProgress, Map headers) { - return new ShardFollowNodeTask(id, type, action, getDescription(taskInProgress), parentTaskId, headers); + ShardFollowTask params = taskInProgress.getParams(); + logger.info("{} Creating node task to track leader shard {}, params [{}]", + params.getFollowShardId(), params.getLeaderShardId(), params); + + final Client leaderClient; + if (params.getLeaderClusterAlias() != null) { + leaderClient = wrapClient(client.getRemoteClusterClient(params.getLeaderClusterAlias()), params); + } else { + leaderClient = wrapClient(client, params); + } + Client followerClient = wrapClient(client, params); + BiConsumer scheduler = + (delay, command) -> threadPool.schedule(delay, Ccr.CCR_THREAD_POOL_NAME, command); + return new ShardFollowNodeTask(id, type, action, getDescription(taskInProgress), parentTaskId, headers, + leaderClient, followerClient, params, scheduler); } @Override protected void nodeOperation(final AllocatedPersistentTask task, final ShardFollowTask params, final PersistentTaskState state) { ShardFollowNodeTask shardFollowNodeTask = (ShardFollowNodeTask) task; - Client leaderClient = wrapClient(params.getLeaderClusterAlias() != null ? - this.client.getRemoteClusterClient(params.getLeaderClusterAlias()) : this.client, params); - Client followerClient = wrapClient(this.client, params); - IndexMetadataVersionChecker imdVersionChecker = new IndexMetadataVersionChecker(params.getLeaderShardId().getIndex(), - params.getFollowShardId().getIndex(), client, leaderClient); - logger.info("[{}] initial leader mapping with follower mapping syncing", params); - imdVersionChecker.updateMapping(1L /* Force update, version is initially 0L */, e -> { - if (e == null) { - logger.info("Starting shard following [{}]", params); - fetchGlobalCheckpoint(followerClient, params.getFollowShardId(), - followGlobalCheckPoint -> { - shardFollowNodeTask.updateProcessedGlobalCheckpoint(followGlobalCheckPoint); - prepare(leaderClient, followerClient, shardFollowNodeTask, params, followGlobalCheckPoint, imdVersionChecker); - }, task::markAsFailed); - } else { - shardFollowNodeTask.markAsFailed(e); - } - }); - } - - void prepare(Client leaderClient, Client followerClient, ShardFollowNodeTask task, ShardFollowTask params, - long followGlobalCheckPoint, - IndexMetadataVersionChecker imdVersionChecker) { - if (task.isRunning() == false) { - // TODO: need better cancellation control - return; - } - - final ShardId leaderShard = params.getLeaderShardId(); - final ShardId followerShard = params.getFollowShardId(); - fetchGlobalCheckpoint(leaderClient, leaderShard, leaderGlobalCheckPoint -> { - // TODO: check if both indices have the same history uuid - if (leaderGlobalCheckPoint == followGlobalCheckPoint) { - logger.debug("{} no write operations to fetch", followerShard); - retry(leaderClient, followerClient, task, params, followGlobalCheckPoint, imdVersionChecker); - } else { - assert followGlobalCheckPoint < leaderGlobalCheckPoint : "followGlobalCheckPoint [" + followGlobalCheckPoint + - "] is not below leaderGlobalCheckPoint [" + leaderGlobalCheckPoint + "]"; - logger.debug("{} fetching write operations, leaderGlobalCheckPoint={}, followGlobalCheckPoint={}", followerShard, - leaderGlobalCheckPoint, followGlobalCheckPoint); - Executor ccrExecutor = threadPool.executor(Ccr.CCR_THREAD_POOL_NAME); - Consumer handler = e -> { - if (e == null) { - task.updateProcessedGlobalCheckpoint(leaderGlobalCheckPoint); - prepare(leaderClient, followerClient, task, params, leaderGlobalCheckPoint, imdVersionChecker); - } else { - task.markAsFailed(e); - } - }; - ChunksCoordinator coordinator = new ChunksCoordinator(followerClient, leaderClient, ccrExecutor, imdVersionChecker, - params.getMaxChunkSize(), params.getNumConcurrentChunks(), params.getProcessorMaxTranslogBytes(), leaderShard, - followerShard, handler); - coordinator.createChucks(followGlobalCheckPoint, leaderGlobalCheckPoint); - coordinator.start(); - } + logger.info("{} Started to track leader shard {}", params.getFollowShardId(), params.getLeaderShardId()); + fetchGlobalCheckpoint(shardFollowNodeTask.leaderClient, params.getLeaderShardId(), leaderGlobalCheckPoint -> { + fetchGlobalCheckpoint(shardFollowNodeTask.followerClient, params.getFollowShardId(), followGlobalCheckPoint -> { + shardFollowNodeTask.start(leaderGlobalCheckPoint, followGlobalCheckPoint); + }, task::markAsFailed); }, task::markAsFailed); } - private void retry(Client leaderClient, Client followerClient, ShardFollowNodeTask task, ShardFollowTask params, - long followGlobalCheckPoint, - IndexMetadataVersionChecker imdVersionChecker) { - threadPool.schedule(RETRY_TIMEOUT, Ccr.CCR_THREAD_POOL_NAME, new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - task.markAsFailed(e); - } - - @Override - protected void doRun() throws Exception { - prepare(leaderClient, followerClient, task, params, followGlobalCheckPoint, imdVersionChecker); - } - }); - } - private void fetchGlobalCheckpoint(Client client, ShardId shardId, LongConsumer handler, Consumer errorHandler) { client.admin().indices().stats(new IndicesStatsRequest().indices(shardId.getIndexName()), ActionListener.wrap(r -> { IndexStats indexStats = r.getIndex(shardId.getIndexName()); @@ -197,229 +115,7 @@ private void fetchGlobalCheckpoint(Client client, ShardId shardId, LongConsumer }, errorHandler)); } - static class ChunksCoordinator { - - private static final Logger LOGGER = Loggers.getLogger(ChunksCoordinator.class); - - private final Client followerClient; - private final Client leaderClient; - private final Executor ccrExecutor; - private final IndexMetadataVersionChecker imdVersionChecker; - - private final long batchSize; - private final int concurrentProcessors; - private final long processorMaxTranslogBytes; - private final ShardId leaderShard; - private final ShardId followerShard; - private final Consumer handler; - - private final CountDown countDown; - private final Queue chunks = new ConcurrentLinkedQueue<>(); - private final AtomicReference failureHolder = new AtomicReference<>(); - - ChunksCoordinator(Client followerClient, Client leaderClient, Executor ccrExecutor, IndexMetadataVersionChecker imdVersionChecker, - long batchSize, int concurrentProcessors, long processorMaxTranslogBytes, ShardId leaderShard, - ShardId followerShard, Consumer handler) { - this.followerClient = followerClient; - this.leaderClient = leaderClient; - this.ccrExecutor = ccrExecutor; - this.imdVersionChecker = imdVersionChecker; - this.batchSize = batchSize; - this.concurrentProcessors = concurrentProcessors; - this.processorMaxTranslogBytes = processorMaxTranslogBytes; - this.leaderShard = leaderShard; - this.followerShard = followerShard; - this.handler = handler; - this.countDown = new CountDown(concurrentProcessors); - } - - /** - * Creates chunks of the specified range, inclusive. - * - * @param from the lower end of the range (inclusive) - * @param to the upper end of the range (inclusive) - */ - void createChucks(final long from, final long to) { - LOGGER.debug("{} Creating chunks for operation range [{}] to [{}]", leaderShard, from, to); - for (long i = from; i < to; i += batchSize) { - long v2 = i + batchSize <= to ? i + batchSize - 1 : to; - chunks.add(new long[]{i, v2}); - } - } - - void start() { - LOGGER.debug("{} Start coordination of [{}] chunks with [{}] concurrent processors", - leaderShard, chunks.size(), concurrentProcessors); - for (int i = 0; i < concurrentProcessors; i++) { - ccrExecutor.execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - assert e != null; - LOGGER.error(() -> new ParameterizedMessage("{} Failure starting processor", leaderShard), e); - postProcessChuck(e); - } - - @Override - protected void doRun() throws Exception { - processNextChunk(); - } - }); - } - } - - void processNextChunk() { - long[] chunk = chunks.poll(); - if (chunk == null) { - postProcessChuck(null); - return; - } - LOGGER.debug("{} Processing chunk [{}/{}]", leaderShard, chunk[0], chunk[1]); - Consumer processorHandler = e -> { - if (e == null) { - LOGGER.debug("{} Successfully processed chunk [{}/{}]", leaderShard, chunk[0], chunk[1]); - processNextChunk(); - } else { - LOGGER.error(() -> new ParameterizedMessage("{} Failure processing chunk [{}/{}]", - leaderShard, chunk[0], chunk[1]), e); - postProcessChuck(e); - } - }; - ChunkProcessor processor = new ChunkProcessor(leaderClient, followerClient, chunks, ccrExecutor, imdVersionChecker, - leaderShard, followerShard, processorHandler); - processor.start(chunk[0], chunk[1], processorMaxTranslogBytes); - } - - void postProcessChuck(Exception e) { - if (failureHolder.compareAndSet(null, e) == false) { - Exception firstFailure = failureHolder.get(); - firstFailure.addSuppressed(e); - } - if (countDown.countDown()) { - handler.accept(failureHolder.get()); - } - } - - Queue getChunks() { - return chunks; - } - - } - - static class ChunkProcessor { - - private final Client leaderClient; - private final Client followerClient; - private final Queue chunks; - private final Executor ccrExecutor; - private final BiConsumer> indexVersionChecker; - - private final ShardId leaderShard; - private final ShardId followerShard; - private final Consumer handler; - final AtomicInteger retryCounter = new AtomicInteger(0); - - ChunkProcessor(Client leaderClient, Client followerClient, Queue chunks, Executor ccrExecutor, - BiConsumer> indexVersionChecker, - ShardId leaderShard, ShardId followerShard, Consumer handler) { - this.leaderClient = leaderClient; - this.followerClient = followerClient; - this.chunks = chunks; - this.ccrExecutor = ccrExecutor; - this.indexVersionChecker = indexVersionChecker; - this.leaderShard = leaderShard; - this.followerShard = followerShard; - this.handler = handler; - } - - void start(final long from, final long to, final long maxTranslogsBytes) { - ShardChangesAction.Request request = new ShardChangesAction.Request(leaderShard); - // Treat -1 as 0, because shard changes api min_seq_no is inclusive and therefore it doesn't allow a negative min_seq_no - // (If no indexing has happened in leader shard then global checkpoint is -1.) - request.setMinSeqNo(Math.max(0, from)); - request.setMaxSeqNo(to); - request.setMaxTranslogsBytes(maxTranslogsBytes); - leaderClient.execute(ShardChangesAction.INSTANCE, request, new ActionListener() { - @Override - public void onResponse(ShardChangesAction.Response response) { - handleResponse(to, response); - } - - @Override - public void onFailure(Exception e) { - assert e != null; - if (shouldRetry(e)) { - if (retryCounter.incrementAndGet() <= PROCESSOR_RETRY_LIMIT) { - start(from, to, maxTranslogsBytes); - } else { - handler.accept(new ElasticsearchException("retrying failed [" + retryCounter.get() + - "] times, aborting...", e)); - } - } else { - handler.accept(e); - } - } - }); - } - - void handleResponse(final long to, final ShardChangesAction.Response response) { - if (response.getOperations().length != 0) { - Translog.Operation lastOp = response.getOperations()[response.getOperations().length - 1]; - boolean maxByteLimitReached = lastOp.seqNo() < to; - if (maxByteLimitReached) { - // add a new entry to the queue for the operations that couldn't be fetched in the current shard changes api call: - chunks.add(new long[]{lastOp.seqNo() + 1, to}); - } - } - ccrExecutor.execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - assert e != null; - handler.accept(e); - } - - @Override - protected void doRun() throws Exception { - indexVersionChecker.accept(response.getIndexMetadataVersion(), e -> { - if (e != null) { - if (shouldRetry(e) && retryCounter.incrementAndGet() <= PROCESSOR_RETRY_LIMIT) { - handleResponse(to, response); - } else { - handler.accept(new ElasticsearchException("retrying failed [" + retryCounter.get() + - "] times, aborting...", e)); - } - return; - } - final BulkShardOperationsRequest request = new BulkShardOperationsRequest(followerShard, response.getOperations()); - followerClient.execute(BulkShardOperationsAction.INSTANCE, request, - new ActionListener() { - @Override - public void onResponse(final BulkShardOperationsResponse bulkShardOperationsResponse) { - handler.accept(null); - } - - @Override - public void onFailure(final Exception e) { - // No retry mechanism here, because if a failure is being redirected to this place it is considered - // non recoverable. - assert e != null; - handler.accept(e); - } - } - ); - }); - } - }); - } - - boolean shouldRetry(Exception e) { - // TODO: What other exceptions should be retried? - return NetworkExceptionHelper.isConnectException(e) || - NetworkExceptionHelper.isCloseConnectionException(e); - } - - } - - static Client wrapClient(Client client, ShardFollowTask shardFollowTask) { + private static Client wrapClient(Client client, ShardFollowTask shardFollowTask) { if (shardFollowTask.getHeaders().isEmpty()) { return client; } else { @@ -446,76 +142,4 @@ private static ThreadContext.StoredContext stashWithHeaders(ThreadContext thread return storedContext; } - static final class IndexMetadataVersionChecker implements BiConsumer> { - - private static final Logger LOGGER = Loggers.getLogger(IndexMetadataVersionChecker.class); - - private final Client followClient; - private final Client leaderClient; - private final Index leaderIndex; - private final Index followIndex; - private final AtomicLong currentIndexMetadataVersion; - private final Semaphore updateMappingSemaphore = new Semaphore(1); - - IndexMetadataVersionChecker(Index leaderIndex, Index followIndex, Client followClient, Client leaderClient) { - this.followClient = followClient; - this.leaderIndex = leaderIndex; - this.followIndex = followIndex; - this.leaderClient = leaderClient; - this.currentIndexMetadataVersion = new AtomicLong(); - } - - public void accept(Long minimumRequiredIndexMetadataVersion, Consumer handler) { - if (currentIndexMetadataVersion.get() >= minimumRequiredIndexMetadataVersion) { - LOGGER.debug("Current index metadata version [{}] is higher or equal than minimum required index metadata version [{}]", - currentIndexMetadataVersion.get(), minimumRequiredIndexMetadataVersion); - handler.accept(null); - } else { - updateMapping(minimumRequiredIndexMetadataVersion, handler); - } - } - - void updateMapping(long minimumRequiredIndexMetadataVersion, Consumer handler) { - try { - updateMappingSemaphore.acquire(); - } catch (InterruptedException e) { - handler.accept(e); - return; - } - if (currentIndexMetadataVersion.get() >= minimumRequiredIndexMetadataVersion) { - updateMappingSemaphore.release(); - LOGGER.debug("Current index metadata version [{}] is higher or equal than minimum required index metadata version [{}]", - currentIndexMetadataVersion.get(), minimumRequiredIndexMetadataVersion); - handler.accept(null); - return; - } - - ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); - clusterStateRequest.clear(); - clusterStateRequest.metaData(true); - clusterStateRequest.indices(leaderIndex.getName()); - - leaderClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> { - IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex); - assert indexMetaData.getMappings().size() == 1; - MappingMetaData mappingMetaData = indexMetaData.getMappings().iterator().next().value; - - PutMappingRequest putMappingRequest = new PutMappingRequest(followIndex.getName()); - putMappingRequest.type(mappingMetaData.type()); - putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON); - followClient.admin().indices().putMapping(putMappingRequest, ActionListener.wrap(putMappingResponse -> { - currentIndexMetadataVersion.set(indexMetaData.getVersion()); - updateMappingSemaphore.release(); - handler.accept(null); - }, e -> { - updateMappingSemaphore.release(); - handler.accept(e); - })); - }, e -> { - updateMappingSemaphore.release(); - handler.accept(e); - })); - } - } - } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsResponse.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsResponse.java index 62612e4bb4b3a..c4d26d96905b1 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsResponse.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsResponse.java @@ -7,12 +7,39 @@ import org.elasticsearch.action.support.WriteResponse; import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; public final class BulkShardOperationsResponse extends ReplicationResponse implements WriteResponse { + private long localCheckpoint; + + BulkShardOperationsResponse(long localCheckPoint) { + this.localCheckpoint = localCheckPoint; + } + + BulkShardOperationsResponse() { + } + + public long getLocalCheckpoint() { + return localCheckpoint; + } + @Override public void setForcedRefresh(final boolean forcedRefresh) { + } + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + localCheckpoint = in.readZLong(); } + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeZLong(localCheckpoint); + } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index 81cbf042037c1..9048d121a0e96 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -103,7 +103,8 @@ static WritePrimaryResult(replicaRequest, new BulkShardOperationsResponse(), location, null, primary, logger); + long localCheckPoint = primary.getLocalCheckpoint(); + return new WritePrimaryResult<>(replicaRequest, new BulkShardOperationsResponse(localCheckPoint), location, null, primary, logger); } @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowIndexAction.java index 235308f902926..cc1a82f5787f5 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowIndexAction.java @@ -40,16 +40,22 @@ static Request createRequest(RestRequest restRequest) { Request request = new Request(); request.setLeaderIndex(restRequest.param("leader_index")); request.setFollowIndex(restRequest.param("index")); - if (restRequest.hasParam(ShardFollowTask.MAX_CHUNK_SIZE.getPreferredName())) { - request.setBatchSize(Long.valueOf(restRequest.param(ShardFollowTask.MAX_CHUNK_SIZE.getPreferredName()))); + if (restRequest.hasParam(ShardFollowTask.MAX_READ_SIZE.getPreferredName())) { + request.setMaxReadSize(Integer.valueOf(restRequest.param(ShardFollowTask.MAX_READ_SIZE.getPreferredName()))); } - if (restRequest.hasParam(ShardFollowTask.NUM_CONCURRENT_CHUNKS.getPreferredName())) { - request.setConcurrentProcessors(Integer.valueOf(restRequest.param(ShardFollowTask.NUM_CONCURRENT_CHUNKS.getPreferredName()))); + if (restRequest.hasParam(ShardFollowTask.MAX_CONCURRENT_READS.getPreferredName())) { + request.setMaxConcurrentReads(Integer.valueOf(restRequest.param(ShardFollowTask.MAX_CONCURRENT_READS.getPreferredName()))); } - if (restRequest.hasParam(ShardFollowTask.PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST.getPreferredName())) { - long value = Long.valueOf(restRequest.param(ShardFollowTask.PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST.getPreferredName())); + if (restRequest.hasParam(ShardFollowTask.MAX_TRANSLOG_BYTES_PER_REQUEST.getPreferredName())) { + long value = Long.valueOf(restRequest.param(ShardFollowTask.MAX_TRANSLOG_BYTES_PER_REQUEST.getPreferredName())); request.setProcessorMaxTranslogBytes(value); } + if (restRequest.hasParam(ShardFollowTask.MAX_WRITE_SIZE.getPreferredName())) { + request.setMaxWriteSize(Integer.valueOf(restRequest.param(ShardFollowTask.MAX_WRITE_SIZE.getPreferredName()))); + } + if (restRequest.hasParam(ShardFollowTask.MAX_CONCURRENT_WRITES.getPreferredName())) { + request.setMaxConcurrentWrites(Integer.valueOf(restRequest.param(ShardFollowTask.MAX_CONCURRENT_WRITES.getPreferredName()))); + } return request; } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java index ba9855b58736e..7c58be62e5c5e 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java @@ -9,13 +9,20 @@ import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.analysis.common.CommonAnalysisPlugin; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.support.XContentMapValues; @@ -24,6 +31,7 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.MockHttpTransport; @@ -37,6 +45,7 @@ import org.elasticsearch.xpack.core.XPackSettings; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -45,10 +54,14 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import static java.util.Collections.singletonMap; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -90,8 +103,8 @@ protected Collection> transportClientPlugins() { // this emulates what the CCR persistent task will do for pulling public void testGetOperationsBasedOnGlobalSequenceId() throws Exception { client().admin().indices().prepareCreate("index") - .setSettings(Settings.builder().put("index.number_of_shards", 1)) - .get(); + .setSettings(Settings.builder().put("index.number_of_shards", 1)) + .get(); client().prepareIndex("index", "doc", "1").setSource("{}", XContentType.JSON).get(); client().prepareIndex("index", "doc", "2").setSource("{}", XContentType.JSON).get(); @@ -147,9 +160,9 @@ public void testGetOperationsBasedOnGlobalSequenceId() throws Exception { public void testFollowIndex() throws Exception { final int numberOfPrimaryShards = randomIntBetween(1, 3); final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, - Collections.singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); - ensureGreen("index1"); + ensureYellow("index1"); final FollowIndexAction.Request followRequest = new FollowIndexAction.Request(); followRequest.setLeaderIndex("index1"); @@ -209,9 +222,9 @@ public void testFollowIndex() throws Exception { public void testSyncMappings() throws Exception { final String leaderIndexSettings = getIndexSettings(2, - Collections.singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); - ensureGreen("index1"); + ensureYellow("index1"); final FollowIndexAction.Request followRequest = new FollowIndexAction.Request(); followRequest.setLeaderIndex("index1"); @@ -229,7 +242,7 @@ public void testSyncMappings() throws Exception { assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(firstBatchNumDocs))); MappingMetaData mappingMetaData = client().admin().indices().prepareGetMappings("index2").get().getMappings() - .get("index2").get("doc"); + .get("index2").get("doc"); assertThat(XContentMapValues.extractValue("properties.f.type", mappingMetaData.sourceAsMap()), equalTo("integer")); assertThat(XContentMapValues.extractValue("properties.k", mappingMetaData.sourceAsMap()), nullValue()); @@ -240,41 +253,136 @@ public void testSyncMappings() throws Exception { } assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, - equalTo(firstBatchNumDocs + secondBatchNumDocs))); + equalTo(firstBatchNumDocs + secondBatchNumDocs))); mappingMetaData = client().admin().indices().prepareGetMappings("index2").get().getMappings() - .get("index2").get("doc"); + .get("index2").get("doc"); assertThat(XContentMapValues.extractValue("properties.f.type", mappingMetaData.sourceAsMap()), equalTo("integer")); assertThat(XContentMapValues.extractValue("properties.k.type", mappingMetaData.sourceAsMap()), equalTo("long")); + unfollowIndex("index2"); + } - final UnfollowIndexAction.Request unfollowRequest = new UnfollowIndexAction.Request(); - unfollowRequest.setFollowIndex("index2"); - client().execute(UnfollowIndexAction.INSTANCE, unfollowRequest).get(); + public void testFollowIndex_backlog() throws Exception { + String leaderIndexSettings = getIndexSettings(3, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); + BulkProcessor.Listener listener = new BulkProcessor.Listener() { + @Override + public void beforeBulk(long executionId, BulkRequest request) {} - assertBusy(() -> { - final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - assertThat(tasks.tasks().size(), equalTo(0)); + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {} - ListTasksRequest listTasksRequest = new ListTasksRequest(); - listTasksRequest.setDetailed(true); - ListTasksResponse listTasksResponse = client().admin().cluster().listTasks(listTasksRequest).get(); - int numNodeTasks = 0; - for (TaskInfo taskInfo : listTasksResponse.getTasks()) { - if (taskInfo.getAction().startsWith(ListTasksAction.NAME) == false) { - numNodeTasks++; + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) {} + }; + BulkProcessor bulkProcessor = BulkProcessor.builder(client(), listener) + .setBulkActions(100) + .setConcurrentRequests(4) + .build(); + AtomicBoolean run = new AtomicBoolean(true); + Thread thread = new Thread(() -> { + int counter = 0; + while (run.get()) { + final String source = String.format(Locale.ROOT, "{\"f\":%d}", counter++); + IndexRequest indexRequest = new IndexRequest("index1", "doc") + .source(source, XContentType.JSON) + .timeout(TimeValue.timeValueSeconds(1)); + bulkProcessor.add(indexRequest); + } + }); + thread.start(); + + // Waiting for some document being index before following the index: + int maxReadSize = randomIntBetween(128, 2048); + long numDocsIndexed = Math.min(3000 * 2, randomLongBetween(maxReadSize, maxReadSize * 10)); + atLeastDocsIndexed("index1", numDocsIndexed / 3); + + final FollowIndexAction.Request followRequest = new FollowIndexAction.Request(); + followRequest.setLeaderIndex("index1"); + followRequest.setFollowIndex("index2"); + followRequest.setMaxReadSize(maxReadSize); + followRequest.setMaxConcurrentReads(randomIntBetween(2, 10)); + followRequest.setMaxConcurrentWrites(randomIntBetween(2, 10)); + CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(); + createAndFollowRequest.setFollowRequest(followRequest); + client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get(); + + atLeastDocsIndexed("index1", numDocsIndexed); + run.set(false); + thread.join(); + assertThat(bulkProcessor.awaitClose(1L, TimeUnit.MINUTES), is(true)); + + assertSameDocCount("index1", "index2"); + unfollowIndex("index2"); + } + + public void testFollowIndexAndCloseNode() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(3); + String leaderIndexSettings = getIndexSettings(3, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); + + String followerIndexSettings = getIndexSettings(3, singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true")); + assertAcked(client().admin().indices().prepareCreate("index2").setSource(followerIndexSettings, XContentType.JSON)); + ensureGreen("index1", "index2"); + + AtomicBoolean run = new AtomicBoolean(true); + Thread thread = new Thread(() -> { + int counter = 0; + while (run.get()) { + final String source = String.format(Locale.ROOT, "{\"f\":%d}", counter++); + try { + client().prepareIndex("index1", "doc") + .setSource(source, XContentType.JSON) + .setTimeout(TimeValue.timeValueSeconds(1)) + .get(); + } catch (Exception e) { + logger.error("Error while indexing into leader index", e); } } - assertThat(numNodeTasks, equalTo(0)); }); + thread.start(); + + final FollowIndexAction.Request followRequest = new FollowIndexAction.Request(); + followRequest.setLeaderIndex("index1"); + followRequest.setFollowIndex("index2"); + followRequest.setMaxReadSize(randomIntBetween(32, 2048)); + followRequest.setMaxConcurrentReads(randomIntBetween(2, 10)); + client().execute(FollowIndexAction.INSTANCE, followRequest).get(); + + long maxNumDocsReplicated = Math.min(3000, randomLongBetween(followRequest.getMaxReadSize(), followRequest.getMaxReadSize() * 10)); + long minNumDocsReplicated = maxNumDocsReplicated / 3L; + logger.info("waiting for at least [{}] documents to be indexed and then stop a random data node", minNumDocsReplicated); + awaitBusy(() -> { + SearchRequest request = new SearchRequest("index2"); + request.source(new SearchSourceBuilder().size(0)); + SearchResponse response = client().search(request).actionGet(); + if (response.getHits().getTotalHits() >= minNumDocsReplicated) { + try { + internalCluster().stopRandomNonMasterNode(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return true; + } else { + return false; + } + }, 30, TimeUnit.SECONDS); + + logger.info("waiting for at least [{}] documents to be indexed", maxNumDocsReplicated); + atLeastDocsIndexed("index2", maxNumDocsReplicated); + run.set(false); + thread.join(); + + assertSameDocCount("index1", "index2"); + unfollowIndex("index2"); } public void testFollowIndexWithNestedField() throws Exception { final String leaderIndexSettings = - getIndexSettingsWithNestedMapping(1, Collections.singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + getIndexSettingsWithNestedMapping(1, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); final String followerIndexSettings = - getIndexSettingsWithNestedMapping(1, Collections.singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true")); + getIndexSettingsWithNestedMapping(1, singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true")); assertAcked(client().admin().indices().prepareCreate("index2").setSource(followerIndexSettings, XContentType.JSON)); ensureGreen("index1", "index2"); @@ -311,16 +419,7 @@ public void testFollowIndexWithNestedField() throws Exception { equalTo(Collections.singletonList(value))); }); } - - final UnfollowIndexAction.Request unfollowRequest = new UnfollowIndexAction.Request(); - unfollowRequest.setFollowIndex("index2"); - client().execute(UnfollowIndexAction.INSTANCE, unfollowRequest).get(); - - assertBusy(() -> { - final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - assertThat(tasks.tasks().size(), equalTo(0)); - }); + unfollowIndex("index2"); } public void testUnfollowNonExistingIndex() { @@ -347,6 +446,43 @@ public void testFollowNonExistentIndex() throws Exception { expectThrows(IllegalArgumentException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest).actionGet()); } + public void testFollowIndex_lowMaxTranslogBytes() throws Exception { + final String leaderIndexSettings = getIndexSettings(1, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); + ensureYellow("index1"); + + final int numDocs = 1024; + logger.info("Indexing [{}] docs", numDocs); + for (int i = 0; i < numDocs; i++) { + final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); + client().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); + } + + final FollowIndexAction.Request followRequest = new FollowIndexAction.Request(); + followRequest.setProcessorMaxTranslogBytes(1024); + followRequest.setLeaderIndex("index1"); + followRequest.setFollowIndex("index2"); + + final CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(); + createAndFollowRequest.setFollowRequest(followRequest); + client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get(); + + final Map firstBatchNumDocsPerShard = new HashMap<>(); + final ShardStats[] firstBatchShardStats = client().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); + for (final ShardStats shardStats : firstBatchShardStats) { + if (shardStats.getShardRouting().primary()) { + long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1; + firstBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value); + } + } + + assertBusy(assertTask(1, firstBatchNumDocsPerShard)); + for (int i = 0; i < numDocs; i++) { + assertBusy(assertExpectedDocumentRunnable(i)); + } + unfollowIndex("index2"); + } + private CheckedRunnable assertTask(final int numberOfPrimaryShards, final Map numDocsPerShard) { return () -> { final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); @@ -362,7 +498,7 @@ private CheckedRunnable assertTask(final int numberOfPrimaryShards, f List taskInfos = listTasksResponse.getTasks(); assertThat(taskInfos.size(), equalTo(numberOfPrimaryShards)); Collection> shardFollowTasks = - taskMetadata.findTasks(ShardFollowTask.NAME, Objects::nonNull); + taskMetadata.findTasks(ShardFollowTask.NAME, Objects::nonNull); for (PersistentTasksCustomMetaData.PersistentTask shardFollowTask : shardFollowTasks) { final ShardFollowTask shardFollowTaskParams = (ShardFollowTask) shardFollowTask.getParams(); TaskInfo taskInfo = null; @@ -376,9 +512,9 @@ private CheckedRunnable assertTask(final int numberOfPrimaryShards, f assertThat(taskInfo, notNullValue()); ShardFollowNodeTask.Status status = (ShardFollowNodeTask.Status) taskInfo.getStatus(); assertThat(status, notNullValue()); - assertThat( - status.getProcessedGlobalCheckpoint(), - equalTo(numDocsPerShard.get(shardFollowTaskParams.getLeaderShardId()))); + assertThat("incorrect global checkpoint " + shardFollowTaskParams, + status.getProcessedGlobalCheckpoint(), + equalTo(numDocsPerShard.get(shardFollowTaskParams.getLeaderShardId()))); } }; } @@ -422,6 +558,7 @@ private String getIndexSettings(final int numberOfPrimaryShards, final Map additionalSetting : additionalIndexSettings.entrySet()) { builder.field(additionalSetting.getKey(), additionalSetting.getValue()); } @@ -502,4 +639,29 @@ private String getIndexSettingsWithNestedMapping(final int numberOfPrimaryShards } return settings; } + + private void atLeastDocsIndexed(String index, long numDocsReplicated) throws InterruptedException { + logger.info("waiting for at least [{}] documents to be indexed into index [{}]", numDocsReplicated, index); + awaitBusy(() -> { + refresh(index); + SearchRequest request = new SearchRequest(index); + request.source(new SearchSourceBuilder().size(0)); + SearchResponse response = client().search(request).actionGet(); + return response.getHits().getTotalHits() >= numDocsReplicated; + }, 30, TimeUnit.SECONDS); + } + + private void assertSameDocCount(String index1, String index2) throws Exception { + refresh(index1); + SearchRequest request1 = new SearchRequest(index1); + request1.source(new SearchSourceBuilder().size(0)); + SearchResponse response1 = client().search(request1).actionGet(); + assertBusy(() -> { + refresh(index2); + SearchRequest request2 = new SearchRequest(index2); + request2.source(new SearchSourceBuilder().size(0)); + SearchResponse response2 = client().search(request2).actionGet(); + assertThat(response2.getHits().getTotalHits(), equalTo(response1.getHits().getTotalHits())); + }); + } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java deleted file mode 100644 index 9af0d93e9e2bc..0000000000000 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java +++ /dev/null @@ -1,407 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ccr.action; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.client.AdminClient; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.ClusterAdminClient; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.translog.Translog; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor.ChunkProcessor; -import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor.ChunksCoordinator; -import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor.IndexMetadataVersionChecker; -import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction; -import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest; -import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; - -import java.net.ConnectException; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; -import java.util.function.Consumer; - -import static org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor.PROCESSOR_RETRY_LIMIT; -import static org.hamcrest.CoreMatchers.notNullValue; -import static org.hamcrest.CoreMatchers.nullValue; -import static org.hamcrest.CoreMatchers.sameInstance; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.same; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -public class ChunksCoordinatorTests extends ESTestCase { - - public void testCreateChunks() { - Client client = mock(Client.class); - Executor ccrExecutor = Runnable::run; - ShardId leaderShardId = new ShardId("index1", "index1", 0); - ShardId followShardId = new ShardId("index2", "index1", 0); - - IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(), - followShardId.getIndex(), client, client); - ChunksCoordinator coordinator = new ChunksCoordinator(client, client, ccrExecutor, checker, 1024, 1, - Long.MAX_VALUE, leaderShardId, followShardId, e -> {}); - coordinator.createChucks(0, 1023); - List result = new ArrayList<>(coordinator.getChunks()); - assertThat(result.size(), equalTo(1)); - assertThat(result.get(0)[0], equalTo(0L)); - assertThat(result.get(0)[1], equalTo(1023L)); - - coordinator.getChunks().clear(); - coordinator.createChucks(0, 2047); - result = new ArrayList<>(coordinator.getChunks()); - assertThat(result.size(), equalTo(2)); - assertThat(result.get(0)[0], equalTo(0L)); - assertThat(result.get(0)[1], equalTo(1023L)); - assertThat(result.get(1)[0], equalTo(1024L)); - assertThat(result.get(1)[1], equalTo(2047L)); - - coordinator.getChunks().clear(); - coordinator.createChucks(0, 4095); - result = new ArrayList<>(coordinator.getChunks()); - assertThat(result.size(), equalTo(4)); - assertThat(result.get(0)[0], equalTo(0L)); - assertThat(result.get(0)[1], equalTo(1023L)); - assertThat(result.get(1)[0], equalTo(1024L)); - assertThat(result.get(1)[1], equalTo(2047L)); - assertThat(result.get(2)[0], equalTo(2048L)); - assertThat(result.get(2)[1], equalTo(3071L)); - assertThat(result.get(3)[0], equalTo(3072L)); - assertThat(result.get(3)[1], equalTo(4095L)); - - coordinator.getChunks().clear(); - coordinator.createChucks(4096, 8196); - result = new ArrayList<>(coordinator.getChunks()); - assertThat(result.size(), equalTo(5)); - assertThat(result.get(0)[0], equalTo(4096L)); - assertThat(result.get(0)[1], equalTo(5119L)); - assertThat(result.get(1)[0], equalTo(5120L)); - assertThat(result.get(1)[1], equalTo(6143L)); - assertThat(result.get(2)[0], equalTo(6144L)); - assertThat(result.get(2)[1], equalTo(7167L)); - assertThat(result.get(3)[0], equalTo(7168L)); - assertThat(result.get(3)[1], equalTo(8191L)); - assertThat(result.get(4)[0], equalTo(8192L)); - assertThat(result.get(4)[1], equalTo(8196L)); - } - - public void testCoordinator() throws Exception { - Client client = createClientMock(); - - mockShardChangesApiCall(client); - mockBulkShardOperationsApiCall(client); - Executor ccrExecutor = Runnable::run; - ShardId leaderShardId = new ShardId("index1", "index1", 0); - ShardId followShardId = new ShardId("index2", "index1", 0); - - Consumer handler = e -> assertThat(e, nullValue()); - int concurrentProcessors = randomIntBetween(1, 4); - int batchSize = randomIntBetween(1, 1000); - IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(), - followShardId.getIndex(), client, client); - ChunksCoordinator coordinator = new ChunksCoordinator(client, client, ccrExecutor, checker, batchSize, - concurrentProcessors, Long.MAX_VALUE, leaderShardId, followShardId, handler); - - int numberOfOps = randomIntBetween(batchSize, batchSize * 20); - long from = randomInt(1000); - long to = from + numberOfOps - 1; - coordinator.createChucks(from, to); - int expectedNumberOfChunks = numberOfOps / batchSize; - if (numberOfOps % batchSize > 0) { - expectedNumberOfChunks++; - } - assertThat(coordinator.getChunks().size(), equalTo(expectedNumberOfChunks)); - - coordinator.start(); - assertThat(coordinator.getChunks().size(), equalTo(0)); - verify(client, times(expectedNumberOfChunks)).execute(same(ShardChangesAction.INSTANCE), - any(ShardChangesAction.Request.class), any()); - verify(client, times(expectedNumberOfChunks)).execute(same(BulkShardOperationsAction.INSTANCE), - any(BulkShardOperationsRequest.class), any()); - } - - public void testCoordinator_failure() throws Exception { - Exception expectedException = new RuntimeException("throw me"); - Client client = createClientMock(); - boolean shardChangesActionApiCallFailed; - if (randomBoolean()) { - shardChangesActionApiCallFailed = true; - doThrow(expectedException).when(client).execute(same(ShardChangesAction.INSTANCE), - any(ShardChangesAction.Request.class), any()); - } else { - shardChangesActionApiCallFailed = false; - mockShardChangesApiCall(client); - doThrow(expectedException).when(client).execute(same(BulkShardOperationsAction.INSTANCE), - any(BulkShardOperationsRequest.class), any()); - } - Executor ccrExecutor = Runnable::run; - ShardId leaderShardId = new ShardId("index1", "index1", 0); - ShardId followShardId = new ShardId("index2", "index1", 0); - - Consumer handler = e -> { - assertThat(e, notNullValue()); - assertThat(e, sameInstance(expectedException)); - }; - IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(), - followShardId.getIndex(), client, client); - ChunksCoordinator coordinator = new ChunksCoordinator(client, client, ccrExecutor, checker, 10, 1, Long.MAX_VALUE, - leaderShardId, followShardId, handler); - coordinator.createChucks(0, 19); - assertThat(coordinator.getChunks().size(), equalTo(2)); - - coordinator.start(); - assertThat(coordinator.getChunks().size(), equalTo(1)); - verify(client, times(1)).execute(same(ShardChangesAction.INSTANCE), any(ShardChangesAction.Request.class), - any()); - verify(client, times(shardChangesActionApiCallFailed ? 0 : 1)).execute(same(BulkShardOperationsAction.INSTANCE), - any(BulkShardOperationsRequest.class), any()); - } - - public void testCoordinator_concurrent() throws Exception { - Client client = createClientMock(); - mockShardChangesApiCall(client); - mockBulkShardOperationsApiCall(client); - Executor ccrExecutor = command -> new Thread(command).start(); - ShardId leaderShardId = new ShardId("index1", "index1", 0); - ShardId followShardId = new ShardId("index2", "index1", 0); - - AtomicBoolean calledOnceChecker = new AtomicBoolean(false); - AtomicReference failureHolder = new AtomicReference<>(); - CountDownLatch latch = new CountDownLatch(1); - Consumer handler = e -> { - if (failureHolder.compareAndSet(null, e) == false) { - // This handler should only be called once, irregardless of the number of concurrent processors - calledOnceChecker.set(true); - } - latch.countDown(); - }; - IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(), - followShardId.getIndex(), client, client); - ChunksCoordinator coordinator = new ChunksCoordinator(client, client, ccrExecutor, checker, 1000, 4, Long.MAX_VALUE, - leaderShardId, followShardId, handler); - coordinator.createChucks(0, 999999); - assertThat(coordinator.getChunks().size(), equalTo(1000)); - - coordinator.start(); - latch.await(); - assertThat(coordinator.getChunks().size(), equalTo(0)); - verify(client, times(1000)).execute(same(ShardChangesAction.INSTANCE), any(ShardChangesAction.Request.class), any()); - verify(client, times(1000)).execute(same(BulkShardOperationsAction.INSTANCE), any(BulkShardOperationsRequest.class), any()); - assertThat(calledOnceChecker.get(), is(false)); - } - - public void testChunkProcessor() { - Client client = createClientMock(); - Queue chunks = new LinkedList<>(); - mockShardChangesApiCall(client); - mockBulkShardOperationsApiCall(client); - Executor ccrExecutor = Runnable::run; - ShardId leaderShardId = new ShardId("index1", "index1", 0); - ShardId followShardId = new ShardId("index2", "index1", 0); - IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(), - followShardId.getIndex(), client, client); - - boolean[] invoked = new boolean[1]; - Exception[] exception = new Exception[1]; - Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; - ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, checker, leaderShardId, - followShardId, handler); - chunkProcessor.start(0, 10, Long.MAX_VALUE); - assertThat(invoked[0], is(true)); - assertThat(exception[0], nullValue()); - } - - public void testChunkProcessorRetry() { - Client client = createClientMock(); - Queue chunks = new LinkedList<>(); - mockBulkShardOperationsApiCall(client); - int testRetryLimit = randomIntBetween(1, PROCESSOR_RETRY_LIMIT - 1); - mockShardCangesApiCallWithRetry(client, testRetryLimit, new ConnectException("connection exception")); - - Executor ccrExecutor = Runnable::run; - ShardId leaderShardId = new ShardId("index1", "index1", 0); - ShardId followShardId = new ShardId("index2", "index1", 0); - IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(), - followShardId.getIndex(), client, client); - - boolean[] invoked = new boolean[1]; - Exception[] exception = new Exception[1]; - Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; - ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, checker, leaderShardId, - followShardId, handler); - chunkProcessor.start(0, 10, Long.MAX_VALUE); - assertThat(invoked[0], is(true)); - assertThat(exception[0], nullValue()); - assertThat(chunkProcessor.retryCounter.get(), equalTo(testRetryLimit + 1)); - } - - public void testChunkProcessorRetryTooManyTimes() { - Client client = createClientMock(); - Queue chunks = new LinkedList<>(); - mockBulkShardOperationsApiCall(client); - int testRetryLimit = PROCESSOR_RETRY_LIMIT + 1; - mockShardCangesApiCallWithRetry(client, testRetryLimit, new ConnectException("connection exception")); - - Executor ccrExecutor = Runnable::run; - ShardId leaderShardId = new ShardId("index1", "index1", 0); - ShardId followShardId = new ShardId("index2", "index1", 0); - IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(), - followShardId.getIndex(), client, client); - - boolean[] invoked = new boolean[1]; - Exception[] exception = new Exception[1]; - Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; - ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, checker, leaderShardId, - followShardId, handler); - chunkProcessor.start(0, 10, Long.MAX_VALUE); - assertThat(invoked[0], is(true)); - assertThat(exception[0], notNullValue()); - assertThat(exception[0].getMessage(), equalTo("retrying failed [17] times, aborting...")); - assertThat(exception[0].getCause().getMessage(), equalTo("connection exception")); - assertThat(chunkProcessor.retryCounter.get(), equalTo(testRetryLimit)); - } - - public void testChunkProcessorNoneRetryableError() { - Client client = createClientMock(); - Queue chunks = new LinkedList<>(); - mockBulkShardOperationsApiCall(client); - mockShardCangesApiCallWithRetry(client, 3, new RuntimeException("unexpected")); - - Executor ccrExecutor = Runnable::run; - ShardId leaderShardId = new ShardId("index1", "index1", 0); - ShardId followShardId = new ShardId("index2", "index1", 0); - IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(), - followShardId.getIndex(), client, client); - - boolean[] invoked = new boolean[1]; - Exception[] exception = new Exception[1]; - Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; - ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, checker, leaderShardId, - followShardId, handler); - chunkProcessor.start(0, 10, Long.MAX_VALUE); - assertThat(invoked[0], is(true)); - assertThat(exception[0], notNullValue()); - assertThat(exception[0].getMessage(), equalTo("unexpected")); - assertThat(chunkProcessor.retryCounter.get(), equalTo(0)); - } - - public void testChunkProcessorExceedMaxTranslogsBytes() { - long from = 0; - long to = 20; - long actualTo = 10; - Client client = createClientMock(); - Queue chunks = new LinkedList<>(); - doAnswer(invocation -> { - Object[] args = invocation.getArguments(); - assert args.length == 3; - @SuppressWarnings("unchecked") - ActionListener listener = (ActionListener) args[2]; - - List operations = new ArrayList<>(); - for (int i = 0; i <= actualTo; i++) { - operations.add(new Translog.NoOp(i, 1, "test")); - } - listener.onResponse(new ShardChangesAction.Response(1L, operations.toArray(new Translog.Operation[0]))); - return null; - }).when(client).execute(same(ShardChangesAction.INSTANCE), any(ShardChangesAction.Request.class), any()); - - mockBulkShardOperationsApiCall(client); - Executor ccrExecutor = Runnable::run; - ShardId leaderShardId = new ShardId("index1", "index1", 0); - ShardId followShardId = new ShardId("index2", "index1", 0); - IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(), - followShardId.getIndex(), client, client); - - boolean[] invoked = new boolean[1]; - Exception[] exception = new Exception[1]; - Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; - BiConsumer> versionChecker = (indexVersiuon, consumer) -> consumer.accept(null); - ChunkProcessor chunkProcessor = - new ChunkProcessor(client, client, chunks, ccrExecutor, versionChecker, leaderShardId, followShardId, handler); - chunkProcessor.start(from, to, Long.MAX_VALUE); - assertThat(invoked[0], is(true)); - assertThat(exception[0], nullValue()); - assertThat(chunks.size(), equalTo(1)); - assertThat(chunks.peek()[0], equalTo(11L)); - assertThat(chunks.peek()[1], equalTo(20L)); - } - - private Client createClientMock() { - Client client = mock(Client.class); - ClusterAdminClient clusterAdminClient = mock(ClusterAdminClient.class); - AdminClient adminClient = mock(AdminClient.class); - when(adminClient.cluster()).thenReturn(clusterAdminClient); - when(client.admin()).thenReturn(adminClient); - return client; - } - - private void mockShardCangesApiCallWithRetry(Client client, int testRetryLimit, Exception e) { - int[] retryCounter = new int[1]; - doAnswer(invocation -> { - Object[] args = invocation.getArguments(); - assert args.length == 3; - ShardChangesAction.Request request = (ShardChangesAction.Request) args[1]; - @SuppressWarnings("unchecked") - ActionListener listener = (ActionListener) args[2]; - if (retryCounter[0]++ <= testRetryLimit) { - listener.onFailure(e); - } else { - long delta = request.getMaxSeqNo() - request.getMinSeqNo(); - Translog.Operation[] operations = new Translog.Operation[(int) delta]; - for (int i = 0; i < operations.length; i++) { - operations[i] = new Translog.NoOp(request.getMinSeqNo() + i, 1, "test"); - } - ShardChangesAction.Response response = new ShardChangesAction.Response(0L, operations); - listener.onResponse(response); - } - return null; - }).when(client).execute(same(ShardChangesAction.INSTANCE), any(ShardChangesAction.Request.class), any()); - } - - private void mockShardChangesApiCall(Client client) { - doAnswer(invocation -> { - Object[] args = invocation.getArguments(); - assert args.length == 3; - ShardChangesAction.Request request = (ShardChangesAction.Request) args[1]; - @SuppressWarnings("unchecked") - ActionListener listener = (ActionListener) args[2]; - - List operations = new ArrayList<>(); - for (long i = request.getMinSeqNo(); i <= request.getMaxSeqNo(); i++) { - operations.add(new Translog.NoOp(request.getMinSeqNo() + i, 1, "test")); - } - ShardChangesAction.Response response = new ShardChangesAction.Response(0L, operations.toArray(new Translog.Operation[0])); - listener.onResponse(response); - return null; - }).when(client).execute(same(ShardChangesAction.INSTANCE), any(ShardChangesAction.Request.class), any()); - } - - private void mockBulkShardOperationsApiCall(Client client) { - doAnswer(invocation -> { - Object[] args = invocation.getArguments(); - assert args.length == 3; - @SuppressWarnings("unchecked") - ActionListener listener = (ActionListener) args[2]; - listener.onResponse(new BulkShardOperationsResponse()); - return null; - }).when(client).execute(same(BulkShardOperationsAction.INSTANCE), any(BulkShardOperationsRequest.class), any()); - } - -} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexRequestTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexRequestTests.java index 9a0557b369a77..73c4a192a63f5 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexRequestTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexRequestTests.java @@ -23,9 +23,13 @@ static FollowIndexAction.Request createTestRequest() { FollowIndexAction.Request request = new FollowIndexAction.Request(); request.setLeaderIndex(randomAlphaOfLength(4)); request.setFollowIndex(randomAlphaOfLength(4)); - request.setBatchSize(randomNonNegativeLong()); - request.setConcurrentProcessors(randomIntBetween(0, Integer.MAX_VALUE)); + request.setMaxReadSize(randomIntBetween(1, Integer.MAX_VALUE)); + request.setMaxConcurrentReads(randomIntBetween(1, Integer.MAX_VALUE)); request.setProcessorMaxTranslogBytes(randomNonNegativeLong()); + request.setMaxWriteSize(randomIntBetween(1, Integer.MAX_VALUE)); + request.setMaxConcurrentWrites(randomIntBetween(1, Integer.MAX_VALUE)); + request.setMaxConcurrentWrites(randomIntBetween(1, Integer.MAX_VALUE)); + request.setMaxBufferSize(randomIntBetween(1, Integer.MAX_VALUE)); return request; } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java index 7e06ce120a86a..56440dc53f57b 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java @@ -48,7 +48,7 @@ public void testGetOperationsBetween() throws Exception { IndexShard indexShard = indexService.getShard(0); for (int iter = 0; iter < iters; iter++) { int min = randomIntBetween(0, numWrites - 1); - int max = randomIntBetween(min, numWrites - 1); + long max = randomIntBetween(min, numWrites - 1); final Translog.Operation[] operations = ShardChangesAction.getOperationsBetween(indexShard, min, max, Long.MAX_VALUE); final List seenSeqNos = Arrays.stream(operations).map(Translog.Operation::seqNo).collect(Collectors.toList()); final List expectedSeqNos = LongStream.range(min, max + 1).boxed().collect(Collectors.toList()); @@ -57,13 +57,13 @@ public void testGetOperationsBetween() throws Exception { // get operations for a range no operations exists: Exception e = expectThrows(IllegalStateException.class, - () -> ShardChangesAction.getOperationsBetween(indexShard, numWrites, numWrites + 1, Long.MAX_VALUE)); + () -> ShardChangesAction.getOperationsBetween(indexShard, numWrites, (long) numWrites + 1, Long.MAX_VALUE)); assertThat(e.getMessage(), containsString("Not all operations between min_seqno [" + numWrites + "] and max_seqno [" + (numWrites + 1) +"] found")); // get operations for a range some operations do not exist: e = expectThrows(IllegalStateException.class, - () -> ShardChangesAction.getOperationsBetween(indexShard, numWrites - 10, numWrites + 10, Long.MAX_VALUE)); + () -> ShardChangesAction.getOperationsBetween(indexShard, numWrites - 10, (long) numWrites + 10, Long.MAX_VALUE)); assertThat(e.getMessage(), containsString("Not all operations between min_seqno [" + (numWrites - 10) + "] and max_seqno [" + (numWrites + 10) +"] found")); } @@ -81,7 +81,7 @@ public void testGetOperationsBetweenWhenShardNotStarted() throws Exception { ShardRouting shardRouting = TestShardRouting.newShardRouting("index", 0, "_node_id", true, ShardRoutingState.INITIALIZING); Mockito.when(indexShard.routingEntry()).thenReturn(shardRouting); - expectThrows(IndexShardNotStartedException.class, () -> ShardChangesAction.getOperationsBetween(indexShard, 0, 1, Long.MAX_VALUE)); + expectThrows(IndexShardNotStartedException.class, () -> ShardChangesAction.getOperationsBetween(indexShard, 0, 1L, Long.MAX_VALUE)); } public void testGetOperationsBetweenExceedByteLimit() throws Exception { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesRequestTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesRequestTests.java index 3f30545576046..dfb5b11b94801 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesRequestTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesRequestTests.java @@ -32,9 +32,10 @@ public void testValidate() { assertThat(request.validate().getMessage(), containsString("minSeqNo [-1] cannot be lower than 0")); request.setMinSeqNo(4); + request.setMaxSeqNo(0L); assertThat(request.validate().getMessage(), containsString("minSeqNo [4] cannot be larger than maxSeqNo [0]")); - request.setMaxSeqNo(8); + request.setMaxSeqNo(8L); assertThat(request.validate(), nullValue()); } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java index 4902917ab53e3..2b992c8bc6926 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java @@ -13,12 +13,13 @@ public class ShardChangesResponseTests extends AbstractStreamableTestCase failureHolder = new AtomicReference<>(); + + public void testDefaults() throws Exception { + task = createShardFollowTask(ShardFollowNodeTask.DEFAULT_MAX_READ_SIZE, ShardFollowNodeTask.DEFAULT_MAX_CONCURRENT_READS, + ShardFollowNodeTask.DEFAULT_MAX_WRITE_SIZE, ShardFollowNodeTask.DEFAULT_MAX_CONCURRENT_WRITES, 10000, + ShardFollowNodeTask.DEFAULT_MAX_BUFFER_SIZE); + task.start(randomIntBetween(-1, 2048), -1); + + assertBusy(() -> { + assertThat(task.getStatus().getProcessedGlobalCheckpoint(), equalTo(10000L)); + }); + assertThat(mappingUpdateCounter.get(), equalTo(1)); + } + + public void testHitBufferLimit() throws Exception { + // Setting buffer limit to 100, so that we are sure the limit will be met + task = createShardFollowTask(ShardFollowNodeTask.DEFAULT_MAX_READ_SIZE, 3, + ShardFollowNodeTask.DEFAULT_MAX_WRITE_SIZE, 1, 10000, 100); + task.start(-1, -1); + + assertBusy(() -> { + assertThat(task.getStatus().getProcessedGlobalCheckpoint(), equalTo(10000L)); + }); + } + + public void testConcurrentReadsAndWrites() throws Exception { + task = createShardFollowTask(randomIntBetween(32, 2048), randomIntBetween(2, 10), randomIntBetween(32, 2048), + randomIntBetween(2, 10), 50000, 10240); + task.start(randomIntBetween(-1, 2048), -1); + + assertBusy(() -> { + assertThat(task.getStatus().getProcessedGlobalCheckpoint(), equalTo(50000L)); + }); + } + + public void testMappingUpdate() throws Exception { + task = createShardFollowTask(1024, 1, 1024, 1, 10000, 1024); + task.start(-1, -1); + + assertBusy(() -> { + assertThat(task.getStatus().getProcessedGlobalCheckpoint(), greaterThanOrEqualTo(1000L)); + }); + imdVersion.set(2L); + assertBusy(() -> { + assertThat(task.getStatus().getProcessedGlobalCheckpoint(), equalTo(10000L)); + }); + assertThat(mappingUpdateCounter.get(), equalTo(2)); + } + + public void testOccasionalApiFailure() throws Exception { + task = createShardFollowTask(1024, 1, 1024, 1, 10000, 1024); + task.start(-1, -1); + randomlyFailWithRetryableError.set(true); + assertBusy(() -> { + assertThat(task.getStatus().getProcessedGlobalCheckpoint(), equalTo(10000L)); + }); + assertThat(failedRequests.get(), greaterThan(0)); + } + + public void testNotAllExpectedOpsReturned() throws Exception { + task = createShardFollowTask(1024, 1, 1024, 1, 10000, 1024); + task.start(-1, -1); + randomlyTruncateRequests.set(true); + assertBusy(() -> { + assertThat(task.getStatus().getProcessedGlobalCheckpoint(), equalTo(10000L)); + }); + assertThat(truncatedRequests.get(), greaterThan(0)); + } + + ShardFollowNodeTask createShardFollowTask(int maxReadSize, int maxConcurrentReads, int maxWriteSize, + int maxConcurrentWrites, int leaderGlobalCheckpoint, int bufferLimit) { + imdVersion = new AtomicLong(1L); + mappingUpdateCounter = new AtomicInteger(0); + randomlyTruncateRequests = new AtomicBoolean(false); + truncatedRequests = new AtomicInteger(); + randomlyFailWithRetryableError = new AtomicBoolean(false); + failedRequests = new AtomicInteger(0); + AtomicBoolean stopped = new AtomicBoolean(false); + ShardFollowTask params = new ShardFollowTask(null, new ShardId("follow_index", "", 0), + new ShardId("leader_index", "", 0), maxReadSize, maxConcurrentReads, ShardFollowNodeTask.DEFAULT_MAX_TRANSLOG_BYTES, + maxWriteSize, maxConcurrentWrites, bufferLimit, Collections.emptyMap()); + + BiConsumer scheduler = (delay, task) -> { + try { + Thread.sleep(delay.millis()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + Thread thread = new Thread(task); + thread.start(); + }; + AtomicInteger readCounter = new AtomicInteger(); + AtomicInteger writeCounter = new AtomicInteger(); + LocalCheckpointTracker tracker = new LocalCheckpointTracker(SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.NO_OPS_PERFORMED); + return new ShardFollowNodeTask(1L, "type", ShardFollowTask.NAME, "description", null, + Collections.emptyMap(), null, null, params, scheduler) { + + @Override + protected void updateMapping(LongConsumer handler) { + mappingUpdateCounter.incrementAndGet(); + handler.accept(imdVersion.get()); + } + + @Override + protected void innerSendBulkShardOperationsRequest(Translog.Operation[] operations, LongConsumer handler, + Consumer errorHandler) { + if (randomlyFailWithRetryableError.get() && readCounter.incrementAndGet() % 5 == 0) { + failedRequests.incrementAndGet(); + errorHandler.accept(new UnavailableShardsException(params.getFollowShardId(), "test error")); + return; + } + + for(Translog.Operation op : operations) { + tracker.markSeqNoAsCompleted(op.seqNo()); + } + + // Emulate network thread and avoid SO: + Thread thread = new Thread(() -> handler.accept(tracker.getCheckpoint())); + thread.start(); + } + + @Override + protected void innerSendShardChangesRequest(long from, Long to, Consumer handler, + Consumer errorHandler) { + if (randomlyFailWithRetryableError.get() && writeCounter.incrementAndGet() % 5 == 0) { + failedRequests.incrementAndGet(); + errorHandler.accept(new UnavailableShardsException(params.getFollowShardId(), "test error")); + return; + } + + if (from < 0) { + errorHandler.accept(new IllegalArgumentException()); + return; + } + + ShardChangesAction.Response response; + if (from >= leaderGlobalCheckpoint) { + response = new ShardChangesAction.Response(1L, leaderGlobalCheckpoint, new Translog.Operation[0]); + } else { + int size = to == null ? 100 : (int) (to - from + 1); + if (randomlyTruncateRequests.get() && size > 10 && truncatedRequests.get() < 5) { + truncatedRequests.incrementAndGet(); + size = size / 2; + } + Translog.Operation[] ops = new Translog.Operation[size]; + for (int i = 0; i < ops.length; i++) { + ops[i] = new Translog.Index("doc", UUIDs.randomBase64UUID(), from + i, 0, "{}".getBytes(StandardCharsets.UTF_8)); + } + response = new ShardChangesAction.Response(imdVersion.get(), leaderGlobalCheckpoint, ops); + } + // Emulate network thread and avoid SO: + Thread thread = new Thread(() -> handler.accept(response)); + thread.start(); + } + + @Override + protected boolean isStopped() { + return stopped.get(); + } + + @Override + public void markAsCompleted() { + stopped.set(true); + } + + @Override + public void markAsFailed(Exception e) { + stopped.set(true); + failureHolder.set(e); + } + }; + } + + @After + public void cancelNodeTask() throws Exception { + if (task != null){ + task.markAsCompleted(); + assertThat(failureHolder.get(), nullValue()); + assertBusy(() -> { + ShardFollowNodeTask.Status status = task.getStatus(); + assertThat(status.getNumberOfConcurrentReads(), equalTo(0)); + assertThat(status.getNumberOfConcurrentWrites(), equalTo(0)); + }); + } + } + +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java index b128e88e63a5a..5f6370318f7f8 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java @@ -23,11 +23,16 @@ protected ShardFollowTask doParseInstance(XContentParser parser) throws IOExcept @Override protected ShardFollowTask createTestInstance() { return new ShardFollowTask( - randomAlphaOfLength(4), - new ShardId(randomAlphaOfLength(4), randomAlphaOfLength(4), randomInt(5)), - new ShardId(randomAlphaOfLength(4), randomAlphaOfLength(4), randomInt(5)), - randomIntBetween(1, Integer.MAX_VALUE), randomIntBetween(1, Integer.MAX_VALUE), - randomIntBetween(1, Integer.MAX_VALUE), randomBoolean() ? null : Collections.singletonMap("key", "value")); + randomAlphaOfLength(4), + new ShardId(randomAlphaOfLength(4), randomAlphaOfLength(4), randomInt(5)), + new ShardId(randomAlphaOfLength(4), randomAlphaOfLength(4), randomInt(5)), + randomIntBetween(1, Integer.MAX_VALUE), + randomIntBetween(1, Integer.MAX_VALUE), + randomNonNegativeLong(), + randomIntBetween(1, Integer.MAX_VALUE), + randomIntBetween(1, Integer.MAX_VALUE), + randomIntBetween(1, Integer.MAX_VALUE), + randomBoolean() ? null : Collections.singletonMap("key", "value")); } @Override