From 8e1ef0cff93d30a130a106cbd4838fc68231f509 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 10 Jul 2018 16:00:55 +0200 Subject: [PATCH] Rewrite shard follow node task logic (#31581) The current shard follow mechanism is complex and does not give us easy ways the have visibility into the system (e.g. why we are falling behind). The main reason why it is complex is because the current design is highly asynchronous. Also in the current model it is hard to apply backpressure other than reducing the concurrent reads from the leader shard. This PR has the following changes: * Rewrote the shard follow task to coordinate the shard follow mechanism between a leader and follow shard in a single threaded manner. This allows for better unit testing and makes it easier to add stats. * All write operations read from the shard changes api should be added to a buffer instead of directly sending it to the bulk shard operations api. This allows to apply backpressure. In this PR there is a limit that controls how many write ops are allowed in the buffer after which no new reads will be performed until the number of ops is below that limit. * The shard changes api includes the current global checkpoint on the leader shard copy. This allows reading to be a more self sufficient process; instead of relying on a background thread to fetch the leader shard's global checkpoint. * Reading write operations from the leader shard (via shard changes api) is a separate step then writing the write operations (via bulk shards operations api). Whereas before a read would immediately result into a write. * The bulk shard operations api returns the local checkpoint on the follow primary shard, to keep the shard follow task up to date with what has been written. * Moved the shard follow logic that was previously in ShardFollowTasksExecutor to ShardFollowNodeTask. * Moved over the changes from #31242 to make shard follow mechanism resilient from node and shard failures. Relates to #30086 --- .../replication/TransportWriteAction.java | 2 + .../index/engine/LuceneChangesSnapshot.java | 8 +- .../engine/LuceneChangesSnapshotTests.java | 12 +- .../xpack/ccr/FollowIndexSecurityIT.java | 8 +- .../xpack/ccr/FollowIndexIT.java | 2 + .../action/CreateAndFollowIndexAction.java | 11 +- .../xpack/ccr/action/FollowIndexAction.java | 118 +++-- .../xpack/ccr/action/ShardChangesAction.java | 114 +++-- .../xpack/ccr/action/ShardFollowNodeTask.java | 358 +++++++++++++- .../xpack/ccr/action/ShardFollowTask.java | 129 +++-- .../ccr/action/ShardFollowTasksExecutor.java | 448 +++--------------- .../bulk/BulkShardOperationsRequest.java | 17 +- .../bulk/BulkShardOperationsResponse.java | 27 ++ .../TransportBulkShardOperationsAction.java | 35 +- .../rest/RestCreateAndFollowIndexAction.java | 3 +- .../xpack/ccr/rest/RestFollowIndexAction.java | 36 +- .../xpack/ccr/ShardChangesIT.java | 284 ++++++++--- .../ccr/action/ChunksCoordinatorTests.java | 409 ---------------- .../CreateAndFollowIndexRequestTests.java | 4 +- .../ccr/action/FollowIndexActionTests.java | 8 +- .../ccr/action/FollowIndexRequestTests.java | 11 +- .../ccr/action/ShardChangesActionTests.java | 60 +-- .../ccr/action/ShardChangesRequestTests.java | 15 +- .../ccr/action/ShardChangesResponseTests.java | 3 +- .../ShardFollowNodeTaskStatusTests.java | 3 +- .../ccr/action/ShardFollowNodeTaskTests.java | 241 ++++++++++ .../ccr/action/ShardFollowTaskTests.java | 17 +- .../action/bulk/BulkShardOperationsTests.java | 17 +- 28 files changed, 1269 insertions(+), 1131 deletions(-) delete mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java create mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index ca91a32a17a3a..577426637eceb 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -130,6 +130,7 @@ public static class WritePrimaryResult listener = null; public WritePrimaryResult(ReplicaRequest request, @Nullable Response finalResponse, @@ -137,6 +138,7 @@ public WritePrimaryResult(ReplicaRequest request, @Nullable Response finalRespon IndexShard primary, Logger logger) { super(request, finalResponse, operationFailure); this.location = location; + this.primary = primary; assert location == null || operationFailure == null : "expected either failure to be null or translog location to be null, " + "but found: [" + location + "] translog location and [" + operationFailure + "] failure"; diff --git a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java index 991d188787c4a..0a876468914f8 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java @@ -141,14 +141,14 @@ public Translog.Operation next() throws IOException { private void rangeCheck(Translog.Operation op) { if (op == null) { if (lastSeenSeqNo < toSeqNo) { - throw new IllegalStateException("Not all operations between min_seqno [" + fromSeqNo + "] " + - "and max_seqno [" + toSeqNo + "] found; prematurely terminated last_seen_seqno [" + lastSeenSeqNo + "]"); + throw new IllegalStateException("Not all operations between from_seqno [" + fromSeqNo + "] " + + "and to_seqno [" + toSeqNo + "] found; prematurely terminated last_seen_seqno [" + lastSeenSeqNo + "]"); } } else { final long expectedSeqNo = lastSeenSeqNo + 1; if (op.seqNo() != expectedSeqNo) { - throw new IllegalStateException("Not all operations between min_seqno [" + fromSeqNo + "] " + - "and max_seqno [" + toSeqNo + "] found; expected seqno [" + expectedSeqNo + "]; found [" + op + "]"); + throw new IllegalStateException("Not all operations between from_seqno [" + fromSeqNo + "] " + + "and to_seqno [" + toSeqNo + "] found; expected seqno [" + expectedSeqNo + "]; found [" + op + "]"); } } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java index 0c207ca9607ff..7de086a3be239 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java @@ -65,7 +65,7 @@ public void testBasics() throws Exception { try (Translog.Snapshot snapshot = engine.newLuceneChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, true)) { IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot)); assertThat(error.getMessage(), - containsString("Not all operations between min_seqno [" + fromSeqNo + "] and max_seqno [" + toSeqNo + "] found")); + containsString("Not all operations between from_seqno [" + fromSeqNo + "] and to_seqno [" + toSeqNo + "] found")); } try (Translog.Snapshot snapshot = engine.newLuceneChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, false)) { assertThat(snapshot, SnapshotMatchers.size(0)); @@ -108,11 +108,11 @@ searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), f searcher = null; IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot)); assertThat(error.getMessage(), - containsString("Not all operations between min_seqno [" + fromSeqNo + "] and max_seqno [" + toSeqNo + "] found")); + containsString("Not all operations between from_seqno [" + fromSeqNo + "] and to_seqno [" + toSeqNo + "] found")); }finally { IOUtils.close(searcher); } - }else { + } else { fromSeqNo = randomLongBetween(0, refreshedSeqNo); toSeqNo = randomLongBetween(refreshedSeqNo + 1, numOps * 2); Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); @@ -120,7 +120,7 @@ searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), f searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, false)) { searcher = null; assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(fromSeqNo, refreshedSeqNo)); - }finally { + } finally { IOUtils.close(searcher); } searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); @@ -129,7 +129,7 @@ searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), f searcher = null; IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot)); assertThat(error.getMessage(), - containsString("Not all operations between min_seqno [" + fromSeqNo + "] and max_seqno [" + toSeqNo + "] found")); + containsString("Not all operations between from_seqno [" + fromSeqNo + "] and to_seqno [" + toSeqNo + "] found")); }finally { IOUtils.close(searcher); } @@ -139,7 +139,7 @@ searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), f searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, true)) { searcher = null; assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(fromSeqNo, toSeqNo)); - }finally { + } finally { IOUtils.close(searcher); } } diff --git a/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java b/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java index d8c0c2f0c27df..bfd32f385d20c 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java @@ -84,7 +84,7 @@ public void testFollowIndex() throws Exception { assertThat(tasks.size(), equalTo(0)); assertThat(countCcrNodeTasks(), equalTo(0)); }); - + followIndex("leader_cluster:" + allowedIndex, allowedIndex); assertThat(countCcrNodeTasks(), equalTo(1)); assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_xpack/ccr/_unfollow"))); @@ -95,14 +95,14 @@ public void testFollowIndex() throws Exception { assertThat(tasks.size(), equalTo(0)); assertThat(countCcrNodeTasks(), equalTo(0)); }); - + createAndFollowIndex("leader_cluster:" + unallowedIndex, unallowedIndex); // Verify that nothing has been replicated and no node tasks are running // These node tasks should have been failed due to the fact that the user // has no sufficient priviledges. assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0))); verifyDocuments(adminClient(), unallowedIndex, 0); - + followIndex("leader_cluster:" + unallowedIndex, unallowedIndex); assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0))); verifyDocuments(adminClient(), unallowedIndex, 0); @@ -146,12 +146,14 @@ private static void refresh(String index) throws IOException { private static void followIndex(String leaderIndex, String followIndex) throws IOException { final Request request = new Request("POST", "/" + followIndex + "/_xpack/ccr/_follow"); request.addParameter("leader_index", leaderIndex); + request.addParameter("idle_shard_retry_delay", "10ms"); assertOK(client().performRequest(request)); } private static void createAndFollowIndex(String leaderIndex, String followIndex) throws IOException { final Request request = new Request("POST", "/" + followIndex + "/_xpack/ccr/_create_and_follow"); request.addParameter("leader_index", leaderIndex); + request.addParameter("idle_shard_retry_delay", "10ms"); assertOK(client().performRequest(request)); } diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java index 5a52970e7f21d..8e637f7dbc183 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java @@ -96,12 +96,14 @@ private static void refresh(String index) throws IOException { private static void followIndex(String leaderIndex, String followIndex) throws IOException { final Request request = new Request("POST", "/" + followIndex + "/_xpack/ccr/_follow"); request.addParameter("leader_index", leaderIndex); + request.addParameter("idle_shard_retry_delay", "10ms"); assertOK(client().performRequest(request)); } private static void createAndFollowIndex(String leaderIndex, String followIndex) throws IOException { final Request request = new Request("POST", "/" + followIndex + "/_xpack/ccr/_create_and_follow"); request.addParameter("leader_index", leaderIndex); + request.addParameter("idle_shard_retry_delay", "10ms"); assertOK(client().performRequest(request)); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java index 78d6175f39efb..1a887b6d010bf 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java @@ -65,12 +65,15 @@ public static class Request extends AcknowledgedRequest { private FollowIndexAction.Request followRequest; - public FollowIndexAction.Request getFollowRequest() { - return followRequest; + public Request(FollowIndexAction.Request followRequest) { + this.followRequest = Objects.requireNonNull(followRequest); + } + + Request() { } - public void setFollowRequest(FollowIndexAction.Request followRequest) { - this.followRequest = followRequest; + public FollowIndexAction.Request getFollowRequest() { + return followRequest; } @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java index 21c56b53f2e62..9d4ea57b4a4f2 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexingSlowLog; import org.elasticsearch.index.SearchSlowLog; @@ -71,50 +72,57 @@ public static class Request extends ActionRequest { private String leaderIndex; private String followIndex; - private long batchSize = ShardFollowTasksExecutor.DEFAULT_BATCH_SIZE; - private int concurrentProcessors = ShardFollowTasksExecutor.DEFAULT_CONCURRENT_PROCESSORS; - private long processorMaxTranslogBytes = ShardFollowTasksExecutor.DEFAULT_MAX_TRANSLOG_BYTES; - - public String getLeaderIndex() { - return leaderIndex; - } - - public void setLeaderIndex(String leaderIndex) { - this.leaderIndex = leaderIndex; - } - - public String getFollowIndex() { - return followIndex; - } + private int maxBatchOperationCount; + private int maxConcurrentReadBatches; + private long maxOperationSizeInBytes; + private int maxConcurrentWriteBatches; + private int maxWriteBufferSize; + private TimeValue retryTimeout; + private TimeValue idleShardRetryDelay; + + public Request(String leaderIndex, String followIndex, int maxBatchOperationCount, int maxConcurrentReadBatches, + long maxOperationSizeInBytes, int maxConcurrentWriteBatches, int maxWriteBufferSize, + TimeValue retryTimeout, TimeValue idleShardRetryDelay) { + if (maxBatchOperationCount < 1) { + throw new IllegalArgumentException("maxBatchOperationCount must be larger than 0"); + } + if (maxConcurrentReadBatches < 1) { + throw new IllegalArgumentException("concurrent_processors must be larger than 0"); + } + if (maxOperationSizeInBytes <= 0) { + throw new IllegalArgumentException("processor_max_translog_bytes must be larger than 0"); + } + if (maxConcurrentWriteBatches < 1) { + throw new IllegalArgumentException("maxConcurrentWriteBatches must be larger than 0"); + } + if (maxWriteBufferSize < 1) { + throw new IllegalArgumentException("maxWriteBufferSize must be larger than 0"); + } - public void setFollowIndex(String followIndex) { - this.followIndex = followIndex; + this.leaderIndex = Objects.requireNonNull(leaderIndex); + this.followIndex = Objects.requireNonNull(followIndex); + this.maxBatchOperationCount = maxBatchOperationCount; + this.maxConcurrentReadBatches = maxConcurrentReadBatches; + this.maxOperationSizeInBytes = maxOperationSizeInBytes; + this.maxConcurrentWriteBatches = maxConcurrentWriteBatches; + this.maxWriteBufferSize = maxWriteBufferSize; + this.retryTimeout = Objects.requireNonNull(retryTimeout); + this.idleShardRetryDelay = Objects.requireNonNull(idleShardRetryDelay); } - public long getBatchSize() { - return batchSize; + Request() { } - public void setBatchSize(long batchSize) { - if (batchSize < 1) { - throw new IllegalArgumentException("Illegal batch_size [" + batchSize + "]"); - } - - this.batchSize = batchSize; + public String getLeaderIndex() { + return leaderIndex; } - public void setConcurrentProcessors(int concurrentProcessors) { - if (concurrentProcessors < 1) { - throw new IllegalArgumentException("concurrent_processors must be larger than 0"); - } - this.concurrentProcessors = concurrentProcessors; + public String getFollowIndex() { + return followIndex; } - public void setProcessorMaxTranslogBytes(long processorMaxTranslogBytes) { - if (processorMaxTranslogBytes <= 0) { - throw new IllegalArgumentException("processor_max_translog_bytes must be larger than 0"); - } - this.processorMaxTranslogBytes = processorMaxTranslogBytes; + public int getMaxBatchOperationCount() { + return maxBatchOperationCount; } @Override @@ -127,9 +135,13 @@ public void readFrom(StreamInput in) throws IOException { super.readFrom(in); leaderIndex = in.readString(); followIndex = in.readString(); - batchSize = in.readVLong(); - concurrentProcessors = in.readVInt(); - processorMaxTranslogBytes = in.readVLong(); + maxBatchOperationCount = in.readVInt(); + maxConcurrentReadBatches = in.readVInt(); + maxOperationSizeInBytes = in.readVLong(); + maxConcurrentWriteBatches = in.readVInt(); + maxWriteBufferSize = in.readVInt(); + retryTimeout = in.readOptionalTimeValue(); + idleShardRetryDelay = in.readOptionalTimeValue(); } @Override @@ -137,9 +149,13 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(leaderIndex); out.writeString(followIndex); - out.writeVLong(batchSize); - out.writeVInt(concurrentProcessors); - out.writeVLong(processorMaxTranslogBytes); + out.writeVInt(maxBatchOperationCount); + out.writeVInt(maxConcurrentReadBatches); + out.writeVLong(maxOperationSizeInBytes); + out.writeVInt(maxConcurrentWriteBatches); + out.writeVInt(maxWriteBufferSize); + out.writeOptionalTimeValue(retryTimeout); + out.writeOptionalTimeValue(idleShardRetryDelay); } @Override @@ -147,16 +163,21 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Request request = (Request) o; - return batchSize == request.batchSize && - concurrentProcessors == request.concurrentProcessors && - processorMaxTranslogBytes == request.processorMaxTranslogBytes && + return maxBatchOperationCount == request.maxBatchOperationCount && + maxConcurrentReadBatches == request.maxConcurrentReadBatches && + maxOperationSizeInBytes == request.maxOperationSizeInBytes && + maxConcurrentWriteBatches == request.maxConcurrentWriteBatches && + maxWriteBufferSize == request.maxWriteBufferSize && + Objects.equals(retryTimeout, request.retryTimeout) && + Objects.equals(idleShardRetryDelay, request.idleShardRetryDelay) && Objects.equals(leaderIndex, request.leaderIndex) && Objects.equals(followIndex, request.followIndex); } @Override public int hashCode() { - return Objects.hash(leaderIndex, followIndex, batchSize, concurrentProcessors, processorMaxTranslogBytes); + return Objects.hash(leaderIndex, followIndex, maxBatchOperationCount, maxConcurrentReadBatches, maxOperationSizeInBytes, + maxConcurrentWriteBatches, maxWriteBufferSize, retryTimeout, idleShardRetryDelay); } } @@ -197,7 +218,7 @@ protected void doExecute(Task task, Request request, ActionListener li ClusterState localClusterState = clusterService.state(); IndexMetaData followIndexMetadata = localClusterState.getMetaData().index(request.followIndex); - String[] indices = new String[]{request.getLeaderIndex()}; + String[] indices = new String[]{request.leaderIndex}; Map> remoteClusterIndices = remoteClusterService.groupClusterIndices(indices, s -> false); if (remoteClusterIndices.containsKey(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { // Following an index in local cluster, so use local cluster state to fetch leader IndexMetaData: @@ -251,10 +272,13 @@ void start(Request request, String clusterNameAlias, IndexMetaData leaderIndexMe .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));for (int i = 0; i < numShards; i++) { final int shardId = i; String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId; + ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias, new ShardId(followIndexMetadata.getIndex(), shardId), new ShardId(leaderIndexMetadata.getIndex(), shardId), - request.batchSize, request.concurrentProcessors, request.processorMaxTranslogBytes, filteredHeaders); + request.maxBatchOperationCount, request.maxConcurrentReadBatches, request.maxOperationSizeInBytes, + request.maxConcurrentWriteBatches, request.maxWriteBufferSize, request.retryTimeout, + request.idleShardRetryDelay, filteredHeaders); persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask, new ActionListener>() { @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index c72b13a701f41..dd67f09bd7246 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -53,10 +53,10 @@ public Response newResponse() { public static class Request extends SingleShardRequest { - private long minSeqNo; - private long maxSeqNo; + private long fromSeqNo; + private int maxOperationCount; private ShardId shardId; - private long maxTranslogsBytes = ShardFollowTasksExecutor.DEFAULT_MAX_TRANSLOG_BYTES; + private long maxOperationSizeInBytes = ShardFollowNodeTask.DEFAULT_MAX_BATCH_SIZE_IN_BYTES; public Request(ShardId shardId) { super(shardId.getIndexName()); @@ -70,42 +70,42 @@ public ShardId getShard() { return shardId; } - public long getMinSeqNo() { - return minSeqNo; + public long getFromSeqNo() { + return fromSeqNo; } - public void setMinSeqNo(long minSeqNo) { - this.minSeqNo = minSeqNo; + public void setFromSeqNo(long fromSeqNo) { + this.fromSeqNo = fromSeqNo; } - public long getMaxSeqNo() { - return maxSeqNo; + public int getMaxOperationCount() { + return maxOperationCount; } - public void setMaxSeqNo(long maxSeqNo) { - this.maxSeqNo = maxSeqNo; + public void setMaxOperationCount(int maxOperationCount) { + this.maxOperationCount = maxOperationCount; } - public long getMaxTranslogsBytes() { - return maxTranslogsBytes; + public long getMaxOperationSizeInBytes() { + return maxOperationSizeInBytes; } - public void setMaxTranslogsBytes(long maxTranslogsBytes) { - this.maxTranslogsBytes = maxTranslogsBytes; + public void setMaxOperationSizeInBytes(long maxOperationSizeInBytes) { + this.maxOperationSizeInBytes = maxOperationSizeInBytes; } @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = null; - if (minSeqNo < 0) { - validationException = addValidationError("minSeqNo [" + minSeqNo + "] cannot be lower than 0", validationException); + if (fromSeqNo < 0) { + validationException = addValidationError("fromSeqNo [" + fromSeqNo + "] cannot be lower than 0", validationException); } - if (maxSeqNo < minSeqNo) { - validationException = addValidationError("minSeqNo [" + minSeqNo + "] cannot be larger than maxSeqNo [" - + maxSeqNo + "]", validationException); + if (maxOperationCount < 0) { + validationException = addValidationError("maxOperationCount [" + maxOperationCount + + "] cannot be lower than 0", validationException); } - if (maxTranslogsBytes <= 0) { - validationException = addValidationError("maxTranslogsBytes [" + maxTranslogsBytes + "] must be larger than 0", + if (maxOperationSizeInBytes <= 0) { + validationException = addValidationError("maxOperationSizeInBytes [" + maxOperationSizeInBytes + "] must be larger than 0", validationException); } return validationException; @@ -114,19 +114,19 @@ public ActionRequestValidationException validate() { @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - minSeqNo = in.readVLong(); - maxSeqNo = in.readVLong(); + fromSeqNo = in.readVLong(); + maxOperationCount = in.readVInt(); shardId = ShardId.readShardId(in); - maxTranslogsBytes = in.readVLong(); + maxOperationSizeInBytes = in.readVLong(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeVLong(minSeqNo); - out.writeVLong(maxSeqNo); + out.writeVLong(fromSeqNo); + out.writeVInt(maxOperationCount); shardId.writeTo(out); - out.writeVLong(maxTranslogsBytes); + out.writeVLong(maxOperationSizeInBytes); } @@ -135,28 +135,30 @@ public boolean equals(final Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; final Request request = (Request) o; - return minSeqNo == request.minSeqNo && - maxSeqNo == request.maxSeqNo && + return fromSeqNo == request.fromSeqNo && + maxOperationCount == request.maxOperationCount && Objects.equals(shardId, request.shardId) && - maxTranslogsBytes == request.maxTranslogsBytes; + maxOperationSizeInBytes == request.maxOperationSizeInBytes; } @Override public int hashCode() { - return Objects.hash(minSeqNo, maxSeqNo, shardId, maxTranslogsBytes); + return Objects.hash(fromSeqNo, maxOperationCount, shardId, maxOperationSizeInBytes); } } public static final class Response extends ActionResponse { private long indexMetadataVersion; + private long globalCheckpoint; private Translog.Operation[] operations; Response() { } - Response(long indexMetadataVersion, final Translog.Operation[] operations) { + Response(long indexMetadataVersion, long globalCheckpoint, final Translog.Operation[] operations) { this.indexMetadataVersion = indexMetadataVersion; + this.globalCheckpoint = globalCheckpoint; this.operations = operations; } @@ -164,6 +166,10 @@ public long getIndexMetadataVersion() { return indexMetadataVersion; } + public long getGlobalCheckpoint() { + return globalCheckpoint; + } + public Translog.Operation[] getOperations() { return operations; } @@ -172,6 +178,7 @@ public Translog.Operation[] getOperations() { public void readFrom(final StreamInput in) throws IOException { super.readFrom(in); indexMetadataVersion = in.readVLong(); + globalCheckpoint = in.readZLong(); operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new); } @@ -179,6 +186,7 @@ public void readFrom(final StreamInput in) throws IOException { public void writeTo(final StreamOutput out) throws IOException { super.writeTo(out); out.writeVLong(indexMetadataVersion); + out.writeZLong(globalCheckpoint); out.writeArray(Translog.Operation::writeOperation, operations); } @@ -188,13 +196,15 @@ public boolean equals(final Object o) { if (o == null || getClass() != o.getClass()) return false; final Response response = (Response) o; return indexMetadataVersion == response.indexMetadataVersion && - Arrays.equals(operations, response.operations); + globalCheckpoint == response.globalCheckpoint && + Arrays.equals(operations, response.operations); } @Override public int hashCode() { int result = 1; result += Objects.hashCode(indexMetadataVersion); + result += Objects.hashCode(globalCheckpoint); result += Arrays.hashCode(operations); return result; } @@ -221,15 +231,12 @@ public TransportAction(Settings settings, protected Response shardOperation(Request request, ShardId shardId) throws IOException { IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex()); IndexShard indexShard = indexService.getShard(request.getShard().id()); + long globalCheckpoint = indexShard.getGlobalCheckpoint(); final long indexMetaDataVersion = clusterService.state().metaData().index(shardId.getIndex()).getVersion(); - // The following shard generates this request based on the global checkpoint on the primary copy on the leader. - // Although this value might not have been synced to all replica copies on the leader, the requesting range - // is guaranteed to be at most the local-checkpoint of any shard copies on the leader. - assert request.maxSeqNo <= indexShard.getLocalCheckpoint() : "invalid request from_seqno=[" + request.minSeqNo + "]," + - " to_seqno=[" + request.maxSeqNo + "], local_checkpoint=[" + indexShard.getLocalCheckpoint() + "]"; - final Translog.Operation[] operations = - getOperationsBetween(indexShard, request.minSeqNo, request.maxSeqNo, request.maxTranslogsBytes); - return new Response(indexMetaDataVersion, operations); + + final Translog.Operation[] operations = getOperations(indexShard, globalCheckpoint, request.fromSeqNo, + request.maxOperationCount, request.maxOperationSizeInBytes); + return new Response(indexMetaDataVersion, globalCheckpoint, operations); } @Override @@ -254,23 +261,34 @@ protected Response newResponse() { private static final Translog.Operation[] EMPTY_OPERATIONS_ARRAY = new Translog.Operation[0]; - static Translog.Operation[] getOperationsBetween(IndexShard indexShard, long minSeqNo, long maxSeqNo, - long byteLimit) throws IOException { + /** + * Returns at most maxOperationCount operations from the specified from sequence number. + * This method will never return operations above the specified globalCheckpoint. + * + * Also if the sum of collected operations' size is above the specified maxOperationSizeInBytes then this method + * stops collecting more operations and returns what has been collected so far. + */ + static Translog.Operation[] getOperations(IndexShard indexShard, long globalCheckpoint, long fromSeqNo, int maxOperationCount, + long maxOperationSizeInBytes) throws IOException { if (indexShard.state() != IndexShardState.STARTED) { throw new IndexShardNotStartedException(indexShard.shardId(), indexShard.state()); } + if (fromSeqNo > indexShard.getGlobalCheckpoint()) { + return EMPTY_OPERATIONS_ARRAY; + } int seenBytes = 0; + long toSeqNo = Math.min(globalCheckpoint, fromSeqNo + maxOperationCount); final List operations = new ArrayList<>(); - try (Translog.Snapshot snapshot = indexShard.newLuceneChangesSnapshot("ccr", minSeqNo, maxSeqNo, true)) { + try (Translog.Snapshot snapshot = indexShard.newLuceneChangesSnapshot("ccr", fromSeqNo, toSeqNo, true)) { Translog.Operation op; while ((op = snapshot.next()) != null) { if (op.getSource() == null) { - throw new IllegalStateException("source not found for operation: " + op + " minSeqNo: " + minSeqNo + " maxSeqNo: " + - maxSeqNo); + throw new IllegalStateException("source not found for operation: " + op + " fromSeqNo: " + fromSeqNo + + " maxOperationCount: " + maxOperationCount); } operations.add(op); seenBytes += op.estimateSize(); - if (seenBytes > byteLimit) { + if (seenBytes > maxOperationSizeInBytes) { break; } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 8777fd0eabe95..026bb58d9db2a 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -5,73 +5,366 @@ */ package org.elasticsearch.xpack.ccr.action; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.transport.NetworkExceptionHelper; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.translog.Translog; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.atomic.AtomicLong; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.LongConsumer; -public class ShardFollowNodeTask extends AllocatedPersistentTask { +/** + * The node task that fetch the write operations from a leader shard and + * persists these ops in the follower shard. + */ +public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { + + public static final int DEFAULT_MAX_BATCH_OPERATION_COUNT = 1024; + public static final int DEFAULT_MAX_CONCURRENT_READ_BATCHES = 1; + public static final int DEFAULT_MAX_CONCURRENT_WRITE_BATCHES = 1; + public static final int DEFAULT_MAX_WRITE_BUFFER_SIZE = 10240; + public static final long DEFAULT_MAX_BATCH_SIZE_IN_BYTES = Long.MAX_VALUE; + private static final int RETRY_LIMIT = 10; + public static final TimeValue DEFAULT_RETRY_TIMEOUT = new TimeValue(500); + public static final TimeValue DEFAULT_IDLE_SHARD_RETRY_DELAY = TimeValue.timeValueSeconds(10); + + private static final Logger LOGGER = Loggers.getLogger(ShardFollowNodeTask.class); + + private final ShardFollowTask params; + private final TimeValue retryTimeout; + private final TimeValue idleShardChangesRequestDelay; + private final BiConsumer scheduler; - private final AtomicLong processedGlobalCheckpoint = new AtomicLong(); + private volatile long lastRequestedSeqno; + private volatile long leaderGlobalCheckpoint; - ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map headers) { + private volatile int numConcurrentReads = 0; + private volatile int numConcurrentWrites = 0; + private volatile long followerGlobalCheckpoint = 0; + private volatile long currentIndexMetadataVersion = 0; + private final AtomicInteger retryCounter = new AtomicInteger(0); + private final Queue buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo).reversed()); + + ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map headers, + ShardFollowTask params, BiConsumer scheduler, TimeValue idleShardChangesRequestDelay, + TimeValue retryTimeout) { super(id, type, action, description, parentTask, headers); + this.params = params; + this.scheduler = scheduler; + this.retryTimeout = retryTimeout; + this.idleShardChangesRequestDelay = idleShardChangesRequestDelay; + } + + void start(long followerGlobalCheckpoint) { + this.lastRequestedSeqno = followerGlobalCheckpoint; + this.followerGlobalCheckpoint = followerGlobalCheckpoint; + this.leaderGlobalCheckpoint = followerGlobalCheckpoint; + + // Forcefully updates follower mapping, this gets us the leader imd version and + // makes sure that leader and follower mapping are identical. + updateMapping(imdVersion -> { + currentIndexMetadataVersion = imdVersion; + LOGGER.info("{} Started to follow leader shard {}, followGlobalCheckPoint={}, indexMetaDataVersion={}", + params.getFollowShardId(), params.getLeaderShardId(), followerGlobalCheckpoint, imdVersion); + coordinateReads(); + }); + } + + private synchronized void coordinateReads() { + if (isStopped()) { + LOGGER.info("{} shard follow task has been stopped", params.getFollowShardId()); + return; + } + + LOGGER.trace("{} coordinate reads, lastRequestedSeqno={}, leaderGlobalCheckpoint={}", + params.getFollowShardId(), lastRequestedSeqno, leaderGlobalCheckpoint); + final int maxBatchOperationCount = params.getMaxBatchOperationCount(); + while (hasReadBudget() && lastRequestedSeqno < leaderGlobalCheckpoint) { + numConcurrentReads++; + long from = lastRequestedSeqno + 1; + long maxRequiredSeqno = Math.min(leaderGlobalCheckpoint, from + maxBatchOperationCount); + LOGGER.trace("{}[{}] read [{}/{}]", params.getFollowShardId(), numConcurrentReads, maxRequiredSeqno, maxBatchOperationCount); + sendShardChangesRequest(from, maxBatchOperationCount, maxRequiredSeqno); + lastRequestedSeqno = maxRequiredSeqno; + } + + if (numConcurrentReads == 0 && hasReadBudget()) { + assert lastRequestedSeqno == leaderGlobalCheckpoint; + // We sneak peek if there is any thing new in the leader. + // If there is we will happily accept + numConcurrentReads++; + long from = lastRequestedSeqno + 1; + LOGGER.trace("{}[{}] peek read [{}]", params.getFollowShardId(), numConcurrentReads, from); + sendShardChangesRequest(from, maxBatchOperationCount, lastRequestedSeqno); + } + } + + private boolean hasReadBudget() { + assert Thread.holdsLock(this); + if (numConcurrentReads >= params.getMaxConcurrentReadBatches()) { + LOGGER.trace("{} no new reads, maximum number of concurrent reads have been reached [{}]", + params.getFollowShardId(), numConcurrentReads); + return false; + } + if (buffer.size() > params.getMaxWriteBufferSize()) { + LOGGER.trace("{} no new reads, buffer limit has been reached [{}]", params.getFollowShardId(), buffer.size()); + return false; + } + return true; + } + + private synchronized void coordinateWrites() { + while (hasWriteBudget() && buffer.isEmpty() == false) { + long sumEstimatedSize = 0L; + int length = Math.min(params.getMaxBatchOperationCount(), buffer.size()); + List ops = new ArrayList<>(length); + for (int i = 0; i < length; i++) { + Translog.Operation op = buffer.remove(); + ops.add(op); + sumEstimatedSize += op.estimateSize(); + if (sumEstimatedSize > params.getMaxBatchSizeInBytes()) { + break; + } + } + numConcurrentWrites++; + LOGGER.trace("{}[{}] write [{}/{}] [{}]", params.getFollowShardId(), numConcurrentWrites, ops.get(0).seqNo(), + ops.get(ops.size() - 1).seqNo(), ops.size()); + sendBulkShardOperationsRequest(ops); + } + } + + private boolean hasWriteBudget() { + assert Thread.holdsLock(this); + if (numConcurrentWrites >= params.getMaxConcurrentWriteBatches()) { + LOGGER.trace("{} maximum number of concurrent writes have been reached [{}]", + params.getFollowShardId(), numConcurrentWrites); + return false; + } + return true; + } + + + private void sendShardChangesRequest(long from, int maxOperationCount, long maxRequiredSeqNo) { + innerSendShardChangesRequest(from, maxOperationCount, + response -> { + retryCounter.set(0); + handleReadResponse(from, maxRequiredSeqNo, response); + }, + e -> handleFailure(e, () -> sendShardChangesRequest(from, maxOperationCount, maxRequiredSeqNo))); + } + + private void handleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Response response) { + maybeUpdateMapping(response.getIndexMetadataVersion(), () -> { + synchronized (ShardFollowNodeTask.this) { + leaderGlobalCheckpoint = Math.max(leaderGlobalCheckpoint, response.getGlobalCheckpoint()); + final long newMinRequiredSeqNo; + if (response.getOperations().length == 0) { + newMinRequiredSeqNo = from; + } else { + assert response.getOperations()[0].seqNo() == from : + "first operation is not what we asked for. From is [" + from + "], got " + response.getOperations()[0]; + buffer.addAll(Arrays.asList(response.getOperations())); + final long maxSeqNo = response.getOperations()[response.getOperations().length - 1].seqNo(); + assert maxSeqNo== + Arrays.stream(response.getOperations()).mapToLong(Translog.Operation::seqNo).max().getAsLong(); + newMinRequiredSeqNo = maxSeqNo + 1; + // update last requested seq no as we may have gotten more than we asked for and we don't want to ask it again. + lastRequestedSeqno = Math.max(lastRequestedSeqno, maxSeqNo); + assert lastRequestedSeqno <= leaderGlobalCheckpoint : "lastRequestedSeqno [" + lastRequestedSeqno + + "] is larger than the global checkpoint [" + leaderGlobalCheckpoint + "]"; + coordinateWrites(); + } + + if (newMinRequiredSeqNo < maxRequiredSeqNo) { + int newSize = (int) (maxRequiredSeqNo - newMinRequiredSeqNo) + 1; + LOGGER.trace("{} received [{}] ops, still missing [{}/{}], continuing to read...", + params.getFollowShardId(), response.getOperations().length, newMinRequiredSeqNo, maxRequiredSeqNo); + sendShardChangesRequest(newMinRequiredSeqNo, newSize, maxRequiredSeqNo); + } else { + // read is completed, decrement + numConcurrentReads--; + if (response.getOperations().length == 0 && leaderGlobalCheckpoint == lastRequestedSeqno) { + // we got nothing and we have no reason to believe asking again well get us more, treat shard as idle and delay + // future requests + LOGGER.trace("{} received no ops and no known ops to fetch, scheduling to coordinate reads", + params.getFollowShardId()); + scheduler.accept(idleShardChangesRequestDelay, this::coordinateReads); + } else { + coordinateReads(); + } + } + } + }); + } + + private void sendBulkShardOperationsRequest(List operations) { + innerSendBulkShardOperationsRequest(operations, + followerLocalCheckpoint -> { + retryCounter.set(0); + handleWriteResponse(followerLocalCheckpoint); + }, + e -> handleFailure(e, () -> sendBulkShardOperationsRequest(operations)) + ); + } + + private synchronized void handleWriteResponse(long followerLocalCheckpoint) { + this.followerGlobalCheckpoint = Math.max(this.followerGlobalCheckpoint, followerLocalCheckpoint); + numConcurrentWrites--; + assert numConcurrentWrites >= 0; + coordinateWrites(); + + // In case that buffer has more ops than is allowed then reads may all have been stopped, + // this invocation makes sure that we start a read when there is budget in case no reads are being performed. + coordinateReads(); + } + + private synchronized void maybeUpdateMapping(Long minimumRequiredIndexMetadataVersion, Runnable task) { + if (currentIndexMetadataVersion >= minimumRequiredIndexMetadataVersion) { + LOGGER.trace("{} index metadata version [{}] is higher or equal than minimum required index metadata version [{}]", + params.getFollowShardId(), currentIndexMetadataVersion, minimumRequiredIndexMetadataVersion); + task.run(); + } else { + LOGGER.trace("{} updating mapping, index metadata version [{}] is lower than minimum required index metadata version [{}]", + params.getFollowShardId(), currentIndexMetadataVersion, minimumRequiredIndexMetadataVersion); + updateMapping(imdVersion -> { + retryCounter.set(0); + currentIndexMetadataVersion = imdVersion; + task.run(); + }); + } + } + + void handleFailure(Exception e, Runnable task) { + assert e != null; + if (shouldRetry(e)) { + if (isStopped() == false && retryCounter.incrementAndGet() <= RETRY_LIMIT) { + LOGGER.debug(new ParameterizedMessage("{} error during follow shard task, retrying...", params.getFollowShardId()), e); + scheduler.accept(retryTimeout, task); + } else { + markAsFailed(new ElasticsearchException("retrying failed [" + retryCounter.get() + + "] times, aborting...", e)); + } + } else { + markAsFailed(e); + } } + private boolean shouldRetry(Exception e) { + return NetworkExceptionHelper.isConnectException(e) || + NetworkExceptionHelper.isCloseConnectionException(e) || + TransportActions.isShardNotAvailableException(e); + } + + // These methods are protected for testing purposes: + protected abstract void updateMapping(LongConsumer handler); + + protected abstract void innerSendBulkShardOperationsRequest(List operations, LongConsumer handler, + Consumer errorHandler); + + protected abstract void innerSendShardChangesRequest(long from, int maxOperationCount, Consumer handler, + Consumer errorHandler); + @Override protected void onCancelled() { markAsCompleted(); } - public boolean isRunning() { - return isCancelled() == false && isCompleted() == false; - } - - void updateProcessedGlobalCheckpoint(long processedGlobalCheckpoint) { - this.processedGlobalCheckpoint.set(processedGlobalCheckpoint); + protected boolean isStopped() { + return isCancelled() || isCompleted(); } @Override - public Task.Status getStatus() { - return new Status(processedGlobalCheckpoint.get()); + public Status getStatus() { + return new Status(leaderGlobalCheckpoint, lastRequestedSeqno, followerGlobalCheckpoint, numConcurrentReads, numConcurrentWrites); } public static class Status implements Task.Status { public static final String NAME = "shard-follow-node-task-status"; - static final ParseField PROCESSED_GLOBAL_CHECKPOINT_FIELD = new ParseField("processed_global_checkpoint"); + static final ParseField LEADER_GLOBAL_CHECKPOINT_FIELD = new ParseField("leader_global_checkpoint"); + static final ParseField FOLLOWER_GLOBAL_CHECKPOINT_FIELD = new ParseField("follower_global_checkpoint"); + static final ParseField LAST_REQUESTED_SEQNO_FIELD = new ParseField("last_requested_seqno"); + static final ParseField NUMBER_OF_CONCURRENT_READS_FIELD = new ParseField("number_of_concurrent_reads"); + static final ParseField NUMBER_OF_CONCURRENT_WRITES_FIELD = new ParseField("number_of_concurrent_writes"); - static final ConstructingObjectParser PARSER = - new ConstructingObjectParser<>(NAME, args -> new Status((Long) args[0])); + static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, + args -> new Status((long) args[0], (long) args[1], (long) args[2], (int) args[3], (int) args[4])); static { - PARSER.declareLong(ConstructingObjectParser.constructorArg(), PROCESSED_GLOBAL_CHECKPOINT_FIELD); + PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_GLOBAL_CHECKPOINT_FIELD); + PARSER.declareLong(ConstructingObjectParser.constructorArg(), LAST_REQUESTED_SEQNO_FIELD); + PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_GLOBAL_CHECKPOINT_FIELD); + PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_READS_FIELD); + PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_WRITES_FIELD); } - private final long processedGlobalCheckpoint; + private final long leaderGlobalCheckpoint; + private final long lastRequestedSeqno; + private final long followerGlobalCheckpoint; + private final int numberOfConcurrentReads; + private final int numberOfConcurrentWrites; - Status(long processedGlobalCheckpoint) { - this.processedGlobalCheckpoint = processedGlobalCheckpoint; + Status(long leaderGlobalCheckpoint, long lastRequestedSeqno, long followerGlobalCheckpoint, + int numberOfConcurrentReads, int numberOfConcurrentWrites) { + this.leaderGlobalCheckpoint = leaderGlobalCheckpoint; + this.lastRequestedSeqno = lastRequestedSeqno; + this.followerGlobalCheckpoint = followerGlobalCheckpoint; + this.numberOfConcurrentReads = numberOfConcurrentReads; + this.numberOfConcurrentWrites = numberOfConcurrentWrites; } public Status(StreamInput in) throws IOException { - this.processedGlobalCheckpoint = in.readZLong(); + this.leaderGlobalCheckpoint = in.readZLong(); + this.lastRequestedSeqno = in.readZLong(); + this.followerGlobalCheckpoint = in.readZLong(); + this.numberOfConcurrentReads = in.readVInt(); + this.numberOfConcurrentWrites = in.readVInt(); + } + + public long getLeaderGlobalCheckpoint() { + return leaderGlobalCheckpoint; + } + + public long getLastRequestedSeqno() { + return lastRequestedSeqno; + } + + public long getFollowerGlobalCheckpoint() { + return followerGlobalCheckpoint; + } + + public int getNumberOfConcurrentReads() { + return numberOfConcurrentReads; } - public long getProcessedGlobalCheckpoint() { - return processedGlobalCheckpoint; + public int getNumberOfConcurrentWrites() { + return numberOfConcurrentWrites; } @Override @@ -81,14 +374,22 @@ public String getWriteableName() { @Override public void writeTo(StreamOutput out) throws IOException { - out.writeZLong(processedGlobalCheckpoint); + out.writeZLong(leaderGlobalCheckpoint); + out.writeZLong(lastRequestedSeqno); + out.writeZLong(followerGlobalCheckpoint); + out.writeVInt(numberOfConcurrentReads); + out.writeVInt(numberOfConcurrentWrites); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); { - builder.field(PROCESSED_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), processedGlobalCheckpoint); + builder.field(LEADER_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), leaderGlobalCheckpoint); + builder.field(FOLLOWER_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), followerGlobalCheckpoint); + builder.field(LAST_REQUESTED_SEQNO_FIELD.getPreferredName(), lastRequestedSeqno); + builder.field(NUMBER_OF_CONCURRENT_READS_FIELD.getPreferredName(), numberOfConcurrentReads); + builder.field(NUMBER_OF_CONCURRENT_WRITES_FIELD.getPreferredName(), numberOfConcurrentWrites); } builder.endObject(); return builder; @@ -103,12 +404,17 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Status status = (Status) o; - return processedGlobalCheckpoint == status.processedGlobalCheckpoint; + return leaderGlobalCheckpoint == status.leaderGlobalCheckpoint && + lastRequestedSeqno == status.lastRequestedSeqno && + followerGlobalCheckpoint == status.followerGlobalCheckpoint && + numberOfConcurrentReads == status.numberOfConcurrentReads && + numberOfConcurrentWrites == status.numberOfConcurrentWrites; } @Override public int hashCode() { - return Objects.hash(processedGlobalCheckpoint); + return Objects.hash(leaderGlobalCheckpoint, lastRequestedSeqno, followerGlobalCheckpoint, numberOfConcurrentReads, + numberOfConcurrentWrites); } public String toString() { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java index e97f8e7dc0c3c..114ec722f2b96 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java @@ -10,7 +10,9 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.shard.ShardId; @@ -40,15 +42,19 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { static final ParseField LEADER_SHARD_INDEX_UUID_FIELD = new ParseField("leader_shard_index_uuid"); static final ParseField LEADER_SHARD_SHARDID_FIELD = new ParseField("leader_shard_shard"); static final ParseField HEADERS = new ParseField("headers"); - public static final ParseField MAX_CHUNK_SIZE = new ParseField("max_chunk_size"); - public static final ParseField NUM_CONCURRENT_CHUNKS = new ParseField("max_concurrent_chunks"); - public static final ParseField PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST = new ParseField("processor_max_translog_bytes"); + public static final ParseField MAX_BATCH_OPERATION_COUNT = new ParseField("max_batch_operation_count"); + public static final ParseField MAX_CONCURRENT_READ_BATCHES = new ParseField("max_concurrent_read_batches"); + public static final ParseField MAX_BATCH_SIZE_IN_BYTES = new ParseField("max_batch_size_in_bytes"); + public static final ParseField MAX_CONCURRENT_WRITE_BATCHES = new ParseField("max_concurrent_write_batches"); + public static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size"); + public static final ParseField RETRY_TIMEOUT = new ParseField("retry_timeout"); + public static final ParseField IDLE_SHARD_RETRY_DELAY = new ParseField("idle_shard_retry_delay"); @SuppressWarnings("unchecked") - public static ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, + private static ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, (a) -> new ShardFollowTask((String) a[0], new ShardId((String) a[1], (String) a[2], (int) a[3]), - new ShardId((String) a[4], (String) a[5], (int) a[6]), (long) a[7], (int) a[8], (long) a[9], - (Map) a[10])); + new ShardId((String) a[4], (String) a[5], (int) a[6]), (int) a[7], (int) a[8], (long) a[9], + (int) a[10], (int) a[11], (TimeValue) a[12], (TimeValue) a[13], (Map) a[14])); static { PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), LEADER_CLUSTER_ALIAS_FIELD); @@ -58,28 +64,45 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_SHARD_INDEX_FIELD); PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_SHARD_INDEX_UUID_FIELD); PARSER.declareInt(ConstructingObjectParser.constructorArg(), LEADER_SHARD_SHARDID_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), MAX_CHUNK_SIZE); - PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUM_CONCURRENT_CHUNKS); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST); + PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_BATCH_OPERATION_COUNT); + PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_CONCURRENT_READ_BATCHES); + PARSER.declareLong(ConstructingObjectParser.constructorArg(), MAX_BATCH_SIZE_IN_BYTES); + PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_CONCURRENT_WRITE_BATCHES); + PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_WRITE_BUFFER_SIZE); + PARSER.declareField(ConstructingObjectParser.constructorArg(), + (p, c) -> TimeValue.parseTimeValue(p.text(), RETRY_TIMEOUT.getPreferredName()), + RETRY_TIMEOUT, ObjectParser.ValueType.STRING); + PARSER.declareField(ConstructingObjectParser.constructorArg(), + (p, c) -> TimeValue.parseTimeValue(p.text(), IDLE_SHARD_RETRY_DELAY.getPreferredName()), + IDLE_SHARD_RETRY_DELAY, ObjectParser.ValueType.STRING); PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.mapStrings(), HEADERS); } private final String leaderClusterAlias; private final ShardId followShardId; private final ShardId leaderShardId; - private final long maxChunkSize; - private final int numConcurrentChunks; - private final long processorMaxTranslogBytes; + private final int maxBatchOperationCount; + private final int maxConcurrentReadBatches; + private final long maxBatchSizeInBytes; + private final int maxConcurrentWriteBatches; + private final int maxWriteBufferSize; + private final TimeValue retryTimeout; + private final TimeValue idleShardRetryDelay; private final Map headers; - ShardFollowTask(String leaderClusterAlias, ShardId followShardId, ShardId leaderShardId, long maxChunkSize, - int numConcurrentChunks, long processorMaxTranslogBytes, Map headers) { + ShardFollowTask(String leaderClusterAlias, ShardId followShardId, ShardId leaderShardId, int maxBatchOperationCount, + int maxConcurrentReadBatches, long maxBatchSizeInBytes, int maxConcurrentWriteBatches, + int maxWriteBufferSize, TimeValue retryTimeout, TimeValue idleShardRetryDelay, Map headers) { this.leaderClusterAlias = leaderClusterAlias; this.followShardId = followShardId; this.leaderShardId = leaderShardId; - this.maxChunkSize = maxChunkSize; - this.numConcurrentChunks = numConcurrentChunks; - this.processorMaxTranslogBytes = processorMaxTranslogBytes; + this.maxBatchOperationCount = maxBatchOperationCount; + this.maxConcurrentReadBatches = maxConcurrentReadBatches; + this.maxBatchSizeInBytes = maxBatchSizeInBytes; + this.maxConcurrentWriteBatches = maxConcurrentWriteBatches; + this.maxWriteBufferSize = maxWriteBufferSize; + this.retryTimeout = retryTimeout; + this.idleShardRetryDelay = idleShardRetryDelay; this.headers = headers != null ? Collections.unmodifiableMap(headers) : Collections.emptyMap(); } @@ -87,9 +110,13 @@ public ShardFollowTask(StreamInput in) throws IOException { this.leaderClusterAlias = in.readOptionalString(); this.followShardId = ShardId.readShardId(in); this.leaderShardId = ShardId.readShardId(in); - this.maxChunkSize = in.readVLong(); - this.numConcurrentChunks = in.readVInt(); - this.processorMaxTranslogBytes = in.readVLong(); + this.maxBatchOperationCount = in.readVInt(); + this.maxConcurrentReadBatches = in.readVInt(); + this.maxBatchSizeInBytes = in.readVLong(); + this.maxConcurrentWriteBatches = in.readVInt(); + this.maxWriteBufferSize = in.readVInt(); + this.retryTimeout = in.readTimeValue(); + this.idleShardRetryDelay = in.readTimeValue(); this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString)); } @@ -105,16 +132,32 @@ public ShardId getLeaderShardId() { return leaderShardId; } - public long getMaxChunkSize() { - return maxChunkSize; + public int getMaxBatchOperationCount() { + return maxBatchOperationCount; } - public int getNumConcurrentChunks() { - return numConcurrentChunks; + public int getMaxConcurrentReadBatches() { + return maxConcurrentReadBatches; } - public long getProcessorMaxTranslogBytes() { - return processorMaxTranslogBytes; + public int getMaxConcurrentWriteBatches() { + return maxConcurrentWriteBatches; + } + + public int getMaxWriteBufferSize() { + return maxWriteBufferSize; + } + + public long getMaxBatchSizeInBytes() { + return maxBatchSizeInBytes; + } + + public TimeValue getRetryTimeout() { + return retryTimeout; + } + + public TimeValue getIdleShardRetryDelay() { + return idleShardRetryDelay; } public Map getHeaders() { @@ -131,9 +174,13 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalString(leaderClusterAlias); followShardId.writeTo(out); leaderShardId.writeTo(out); - out.writeVLong(maxChunkSize); - out.writeVInt(numConcurrentChunks); - out.writeVLong(processorMaxTranslogBytes); + out.writeVLong(maxBatchOperationCount); + out.writeVInt(maxConcurrentReadBatches); + out.writeVLong(maxBatchSizeInBytes); + out.writeVInt(maxConcurrentWriteBatches); + out.writeVInt(maxWriteBufferSize); + out.writeTimeValue(retryTimeout); + out.writeTimeValue(idleShardRetryDelay); out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString); } @@ -153,9 +200,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(LEADER_SHARD_INDEX_FIELD.getPreferredName(), leaderShardId.getIndex().getName()); builder.field(LEADER_SHARD_INDEX_UUID_FIELD.getPreferredName(), leaderShardId.getIndex().getUUID()); builder.field(LEADER_SHARD_SHARDID_FIELD.getPreferredName(), leaderShardId.id()); - builder.field(MAX_CHUNK_SIZE.getPreferredName(), maxChunkSize); - builder.field(NUM_CONCURRENT_CHUNKS.getPreferredName(), numConcurrentChunks); - builder.field(PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST.getPreferredName(), processorMaxTranslogBytes); + builder.field(MAX_BATCH_OPERATION_COUNT.getPreferredName(), maxBatchOperationCount); + builder.field(MAX_CONCURRENT_READ_BATCHES.getPreferredName(), maxConcurrentReadBatches); + builder.field(MAX_BATCH_SIZE_IN_BYTES.getPreferredName(), maxBatchSizeInBytes); + builder.field(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches); + builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize); + builder.field(RETRY_TIMEOUT.getPreferredName(), retryTimeout.getStringRep()); + builder.field(IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay.getStringRep()); builder.field(HEADERS.getPreferredName(), headers); return builder.endObject(); } @@ -168,16 +219,20 @@ public boolean equals(Object o) { return Objects.equals(leaderClusterAlias, that.leaderClusterAlias) && Objects.equals(followShardId, that.followShardId) && Objects.equals(leaderShardId, that.leaderShardId) && - maxChunkSize == that.maxChunkSize && - numConcurrentChunks == that.numConcurrentChunks && - processorMaxTranslogBytes == that.processorMaxTranslogBytes && + maxBatchOperationCount == that.maxBatchOperationCount && + maxConcurrentReadBatches == that.maxConcurrentReadBatches && + maxConcurrentWriteBatches == that.maxConcurrentWriteBatches && + maxBatchSizeInBytes == that.maxBatchSizeInBytes && + maxWriteBufferSize == that.maxWriteBufferSize && + Objects.equals(retryTimeout, that.retryTimeout) && + Objects.equals(idleShardRetryDelay, that.idleShardRetryDelay) && Objects.equals(headers, that.headers); } @Override public int hashCode() { - return Objects.hash(leaderClusterAlias, followShardId, leaderShardId, maxChunkSize, numConcurrentChunks, - processorMaxTranslogBytes, headers); + return Objects.hash(leaderClusterAlias, followShardId, leaderShardId, maxBatchOperationCount, maxConcurrentReadBatches, + maxConcurrentWriteBatches, maxBatchSizeInBytes, maxWriteBufferSize, retryTimeout, idleShardRetryDelay, headers); } public String toString() { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 895f53846f65a..969e9e8bf9020 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -5,15 +5,12 @@ */ package org.elasticsearch.xpack.ccr.action; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; -import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; import org.elasticsearch.action.admin.indices.stats.ShardStats; @@ -24,15 +21,11 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.IndexRoutingTable; -import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.NetworkExceptionHelper; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.common.util.concurrent.CountDown; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; -import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.persistent.AllocatedPersistentTask; @@ -44,18 +37,11 @@ import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest; -import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.LongConsumer; @@ -64,12 +50,6 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor { - static final long DEFAULT_BATCH_SIZE = 1024; - static final int PROCESSOR_RETRY_LIMIT = 16; - static final int DEFAULT_CONCURRENT_PROCESSORS = 1; - static final long DEFAULT_MAX_TRANSLOG_BYTES= Long.MAX_VALUE; - private static final TimeValue RETRY_TIMEOUT = TimeValue.timeValueMillis(500); - private final Client client; private final ThreadPool threadPool; @@ -100,84 +80,70 @@ public void validate(ShardFollowTask params, ClusterState clusterState) { protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId, PersistentTasksCustomMetaData.PersistentTask taskInProgress, Map headers) { - return new ShardFollowNodeTask(id, type, action, getDescription(taskInProgress), parentTaskId, headers); - } - - @Override - protected void nodeOperation(final AllocatedPersistentTask task, final ShardFollowTask params, final PersistentTaskState state) { - ShardFollowNodeTask shardFollowNodeTask = (ShardFollowNodeTask) task; - Client leaderClient = wrapClient(params.getLeaderClusterAlias() != null ? - this.client.getRemoteClusterClient(params.getLeaderClusterAlias()) : this.client, params); - Client followerClient = wrapClient(this.client, params); - IndexMetadataVersionChecker imdVersionChecker = new IndexMetadataVersionChecker(params.getLeaderShardId().getIndex(), - params.getFollowShardId().getIndex(), client, leaderClient); - logger.info("[{}] initial leader mapping with follower mapping syncing", params); - imdVersionChecker.updateMapping(1L /* Force update, version is initially 0L */, e -> { - if (e == null) { - logger.info("Starting shard following [{}]", params); - fetchGlobalCheckpoint(followerClient, params.getFollowShardId(), - followGlobalCheckPoint -> { - shardFollowNodeTask.updateProcessedGlobalCheckpoint(followGlobalCheckPoint); - prepare(leaderClient, followerClient, shardFollowNodeTask, params, followGlobalCheckPoint, imdVersionChecker); - }, task::markAsFailed); - } else { - shardFollowNodeTask.markAsFailed(e); - } - }); - } - - void prepare(Client leaderClient, Client followerClient, ShardFollowNodeTask task, ShardFollowTask params, - long followGlobalCheckPoint, - IndexMetadataVersionChecker imdVersionChecker) { - if (task.isRunning() == false) { - // TODO: need better cancellation control - return; + ShardFollowTask params = taskInProgress.getParams(); + final Client leaderClient; + if (params.getLeaderClusterAlias() != null) { + leaderClient = wrapClient(client.getRemoteClusterClient(params.getLeaderClusterAlias()), params); + } else { + leaderClient = wrapClient(client, params); } + Client followerClient = wrapClient(client, params); + BiConsumer scheduler = + (delay, command) -> threadPool.schedule(delay, Ccr.CCR_THREAD_POOL_NAME, command); + return new ShardFollowNodeTask(id, type, action, getDescription(taskInProgress), parentTaskId, headers, params, + scheduler, params.getIdleShardRetryDelay(), params.getRetryTimeout()) { - final ShardId leaderShard = params.getLeaderShardId(); - final ShardId followerShard = params.getFollowShardId(); - fetchGlobalCheckpoint(leaderClient, leaderShard, leaderGlobalCheckPoint -> { - // TODO: check if both indices have the same history uuid - if (leaderGlobalCheckPoint == followGlobalCheckPoint) { - logger.debug("{} no write operations to fetch", followerShard); - retry(leaderClient, followerClient, task, params, followGlobalCheckPoint, imdVersionChecker); - } else { - assert followGlobalCheckPoint < leaderGlobalCheckPoint : "followGlobalCheckPoint [" + followGlobalCheckPoint + - "] is not below leaderGlobalCheckPoint [" + leaderGlobalCheckPoint + "]"; - logger.debug("{} fetching write operations, leaderGlobalCheckPoint={}, followGlobalCheckPoint={}", followerShard, - leaderGlobalCheckPoint, followGlobalCheckPoint); - Executor ccrExecutor = threadPool.executor(Ccr.CCR_THREAD_POOL_NAME); - Consumer handler = e -> { - if (e == null) { - task.updateProcessedGlobalCheckpoint(leaderGlobalCheckPoint); - prepare(leaderClient, followerClient, task, params, leaderGlobalCheckPoint, imdVersionChecker); - } else { - task.markAsFailed(e); - } - }; - ChunksCoordinator coordinator = new ChunksCoordinator(followerClient, leaderClient, ccrExecutor, imdVersionChecker, - params.getMaxChunkSize(), params.getNumConcurrentChunks(), params.getProcessorMaxTranslogBytes(), leaderShard, - followerShard, handler); - coordinator.createChucks(followGlobalCheckPoint, leaderGlobalCheckPoint); - coordinator.start(); + @Override + protected void updateMapping(LongConsumer handler) { + Index leaderIndex = params.getLeaderShardId().getIndex(); + Index followIndex = params.getFollowShardId().getIndex(); + + ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.clear(); + clusterStateRequest.metaData(true); + clusterStateRequest.indices(leaderIndex.getName()); + + leaderClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> { + IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex); + assert indexMetaData.getMappings().size() == 1 : "expected exactly one mapping, but got [" + + indexMetaData.getMappings().size() + "]"; + MappingMetaData mappingMetaData = indexMetaData.getMappings().iterator().next().value; + + PutMappingRequest putMappingRequest = new PutMappingRequest(followIndex.getName()); + putMappingRequest.type(mappingMetaData.type()); + putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON); + followerClient.admin().indices().putMapping(putMappingRequest, ActionListener.wrap( + putMappingResponse -> handler.accept(indexMetaData.getVersion()), + e -> handleFailure(e, () -> updateMapping(handler)))); + }, e -> handleFailure(e, () -> updateMapping(handler)))); } - }, task::markAsFailed); - } - private void retry(Client leaderClient, Client followerClient, ShardFollowNodeTask task, ShardFollowTask params, - long followGlobalCheckPoint, - IndexMetadataVersionChecker imdVersionChecker) { - threadPool.schedule(RETRY_TIMEOUT, Ccr.CCR_THREAD_POOL_NAME, new AbstractRunnable() { @Override - public void onFailure(Exception e) { - task.markAsFailed(e); + protected void innerSendBulkShardOperationsRequest(List operations, LongConsumer handler, + Consumer errorHandler) { + final BulkShardOperationsRequest request = new BulkShardOperationsRequest(params.getFollowShardId(), operations); + followerClient.execute(BulkShardOperationsAction.INSTANCE, request, + ActionListener.wrap(response -> handler.accept(response.getGlobalCheckpoint()), errorHandler)); } @Override - protected void doRun() throws Exception { - prepare(leaderClient, followerClient, task, params, followGlobalCheckPoint, imdVersionChecker); + protected void innerSendShardChangesRequest(long from, int maxOperationCount, Consumer handler, + Consumer errorHandler) { + ShardChangesAction.Request request = new ShardChangesAction.Request(params.getLeaderShardId()); + request.setFromSeqNo(from); + request.setMaxOperationCount(maxOperationCount); + request.setMaxOperationSizeInBytes(params.getMaxBatchSizeInBytes()); + leaderClient.execute(ShardChangesAction.INSTANCE, request, ActionListener.wrap(handler::accept, errorHandler)); } - }); + }; + } + + @Override + protected void nodeOperation(final AllocatedPersistentTask task, final ShardFollowTask params, final PersistentTaskState state) { + Client followerClient = wrapClient(client, params); + ShardFollowNodeTask shardFollowNodeTask = (ShardFollowNodeTask) task; + logger.info("{} Started to track leader shard {}", params.getFollowShardId(), params.getLeaderShardId()); + fetchGlobalCheckpoint(followerClient, params.getFollowShardId(), shardFollowNodeTask::start, task::markAsFailed); } private void fetchGlobalCheckpoint(Client client, ShardId shardId, LongConsumer handler, Consumer errorHandler) { @@ -197,229 +163,7 @@ private void fetchGlobalCheckpoint(Client client, ShardId shardId, LongConsumer }, errorHandler)); } - static class ChunksCoordinator { - - private static final Logger LOGGER = Loggers.getLogger(ChunksCoordinator.class); - - private final Client followerClient; - private final Client leaderClient; - private final Executor ccrExecutor; - private final IndexMetadataVersionChecker imdVersionChecker; - - private final long batchSize; - private final int concurrentProcessors; - private final long processorMaxTranslogBytes; - private final ShardId leaderShard; - private final ShardId followerShard; - private final Consumer handler; - - private final CountDown countDown; - private final Queue chunks = new ConcurrentLinkedQueue<>(); - private final AtomicReference failureHolder = new AtomicReference<>(); - - ChunksCoordinator(Client followerClient, Client leaderClient, Executor ccrExecutor, IndexMetadataVersionChecker imdVersionChecker, - long batchSize, int concurrentProcessors, long processorMaxTranslogBytes, ShardId leaderShard, - ShardId followerShard, Consumer handler) { - this.followerClient = followerClient; - this.leaderClient = leaderClient; - this.ccrExecutor = ccrExecutor; - this.imdVersionChecker = imdVersionChecker; - this.batchSize = batchSize; - this.concurrentProcessors = concurrentProcessors; - this.processorMaxTranslogBytes = processorMaxTranslogBytes; - this.leaderShard = leaderShard; - this.followerShard = followerShard; - this.handler = handler; - this.countDown = new CountDown(concurrentProcessors); - } - - /** - * Creates chunks of the specified range, inclusive. - * - * @param from the lower end of the range (inclusive) - * @param to the upper end of the range (inclusive) - */ - void createChucks(final long from, final long to) { - LOGGER.debug("{} Creating chunks for operation range [{}] to [{}]", leaderShard, from, to); - for (long i = from; i < to; i += batchSize) { - long v2 = i + batchSize <= to ? i + batchSize - 1 : to; - chunks.add(new long[]{i, v2}); - } - } - - void start() { - LOGGER.debug("{} Start coordination of [{}] chunks with [{}] concurrent processors", - leaderShard, chunks.size(), concurrentProcessors); - for (int i = 0; i < concurrentProcessors; i++) { - ccrExecutor.execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - assert e != null; - LOGGER.error(() -> new ParameterizedMessage("{} Failure starting processor", leaderShard), e); - postProcessChuck(e); - } - - @Override - protected void doRun() throws Exception { - processNextChunk(); - } - }); - } - } - - void processNextChunk() { - long[] chunk = chunks.poll(); - if (chunk == null) { - postProcessChuck(null); - return; - } - LOGGER.debug("{} Processing chunk [{}/{}]", leaderShard, chunk[0], chunk[1]); - Consumer processorHandler = e -> { - if (e == null) { - LOGGER.debug("{} Successfully processed chunk [{}/{}]", leaderShard, chunk[0], chunk[1]); - processNextChunk(); - } else { - LOGGER.error(() -> new ParameterizedMessage("{} Failure processing chunk [{}/{}]", - leaderShard, chunk[0], chunk[1]), e); - postProcessChuck(e); - } - }; - ChunkProcessor processor = new ChunkProcessor(leaderClient, followerClient, chunks, ccrExecutor, imdVersionChecker, - leaderShard, followerShard, processorHandler); - processor.start(chunk[0], chunk[1], processorMaxTranslogBytes); - } - - void postProcessChuck(Exception e) { - if (failureHolder.compareAndSet(null, e) == false) { - Exception firstFailure = failureHolder.get(); - firstFailure.addSuppressed(e); - } - if (countDown.countDown()) { - handler.accept(failureHolder.get()); - } - } - - Queue getChunks() { - return chunks; - } - - } - - static class ChunkProcessor { - - private final Client leaderClient; - private final Client followerClient; - private final Queue chunks; - private final Executor ccrExecutor; - private final BiConsumer> indexVersionChecker; - - private final ShardId leaderShard; - private final ShardId followerShard; - private final Consumer handler; - final AtomicInteger retryCounter = new AtomicInteger(0); - - ChunkProcessor(Client leaderClient, Client followerClient, Queue chunks, Executor ccrExecutor, - BiConsumer> indexVersionChecker, - ShardId leaderShard, ShardId followerShard, Consumer handler) { - this.leaderClient = leaderClient; - this.followerClient = followerClient; - this.chunks = chunks; - this.ccrExecutor = ccrExecutor; - this.indexVersionChecker = indexVersionChecker; - this.leaderShard = leaderShard; - this.followerShard = followerShard; - this.handler = handler; - } - - void start(final long from, final long to, final long maxTranslogsBytes) { - ShardChangesAction.Request request = new ShardChangesAction.Request(leaderShard); - // Treat -1 as 0, because shard changes api min_seq_no is inclusive and therefore it doesn't allow a negative min_seq_no - // (If no indexing has happened in leader shard then global checkpoint is -1.) - request.setMinSeqNo(Math.max(0, from)); - request.setMaxSeqNo(to); - request.setMaxTranslogsBytes(maxTranslogsBytes); - leaderClient.execute(ShardChangesAction.INSTANCE, request, new ActionListener() { - @Override - public void onResponse(ShardChangesAction.Response response) { - handleResponse(to, response); - } - - @Override - public void onFailure(Exception e) { - assert e != null; - if (shouldRetry(e)) { - if (retryCounter.incrementAndGet() <= PROCESSOR_RETRY_LIMIT) { - start(from, to, maxTranslogsBytes); - } else { - handler.accept(new ElasticsearchException("retrying failed [" + retryCounter.get() + - "] times, aborting...", e)); - } - } else { - handler.accept(e); - } - } - }); - } - - void handleResponse(final long to, final ShardChangesAction.Response response) { - if (response.getOperations().length != 0) { - Translog.Operation lastOp = response.getOperations()[response.getOperations().length - 1]; - boolean maxByteLimitReached = lastOp.seqNo() < to; - if (maxByteLimitReached) { - // add a new entry to the queue for the operations that couldn't be fetched in the current shard changes api call: - chunks.add(new long[]{lastOp.seqNo() + 1, to}); - } - } - ccrExecutor.execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - assert e != null; - handler.accept(e); - } - - @Override - protected void doRun() throws Exception { - indexVersionChecker.accept(response.getIndexMetadataVersion(), e -> { - if (e != null) { - if (shouldRetry(e) && retryCounter.incrementAndGet() <= PROCESSOR_RETRY_LIMIT) { - handleResponse(to, response); - } else { - handler.accept(new ElasticsearchException("retrying failed [" + retryCounter.get() + - "] times, aborting...", e)); - } - return; - } - final BulkShardOperationsRequest request = new BulkShardOperationsRequest(followerShard, response.getOperations()); - followerClient.execute(BulkShardOperationsAction.INSTANCE, request, - new ActionListener() { - @Override - public void onResponse(final BulkShardOperationsResponse bulkShardOperationsResponse) { - handler.accept(null); - } - - @Override - public void onFailure(final Exception e) { - // No retry mechanism here, because if a failure is being redirected to this place it is considered - // non recoverable. - assert e != null; - handler.accept(e); - } - } - ); - }); - } - }); - } - - boolean shouldRetry(Exception e) { - // TODO: What other exceptions should be retried? - return NetworkExceptionHelper.isConnectException(e) || - NetworkExceptionHelper.isCloseConnectionException(e); - } - - } - - static Client wrapClient(Client client, ShardFollowTask shardFollowTask) { + private static Client wrapClient(Client client, ShardFollowTask shardFollowTask) { if (shardFollowTask.getHeaders().isEmpty()) { return client; } else { @@ -446,76 +190,4 @@ private static ThreadContext.StoredContext stashWithHeaders(ThreadContext thread return storedContext; } - static final class IndexMetadataVersionChecker implements BiConsumer> { - - private static final Logger LOGGER = Loggers.getLogger(IndexMetadataVersionChecker.class); - - private final Client followClient; - private final Client leaderClient; - private final Index leaderIndex; - private final Index followIndex; - private final AtomicLong currentIndexMetadataVersion; - private final Semaphore updateMappingSemaphore = new Semaphore(1); - - IndexMetadataVersionChecker(Index leaderIndex, Index followIndex, Client followClient, Client leaderClient) { - this.followClient = followClient; - this.leaderIndex = leaderIndex; - this.followIndex = followIndex; - this.leaderClient = leaderClient; - this.currentIndexMetadataVersion = new AtomicLong(); - } - - public void accept(Long minimumRequiredIndexMetadataVersion, Consumer handler) { - if (currentIndexMetadataVersion.get() >= minimumRequiredIndexMetadataVersion) { - LOGGER.debug("Current index metadata version [{}] is higher or equal than minimum required index metadata version [{}]", - currentIndexMetadataVersion.get(), minimumRequiredIndexMetadataVersion); - handler.accept(null); - } else { - updateMapping(minimumRequiredIndexMetadataVersion, handler); - } - } - - void updateMapping(long minimumRequiredIndexMetadataVersion, Consumer handler) { - try { - updateMappingSemaphore.acquire(); - } catch (InterruptedException e) { - handler.accept(e); - return; - } - if (currentIndexMetadataVersion.get() >= minimumRequiredIndexMetadataVersion) { - updateMappingSemaphore.release(); - LOGGER.debug("Current index metadata version [{}] is higher or equal than minimum required index metadata version [{}]", - currentIndexMetadataVersion.get(), minimumRequiredIndexMetadataVersion); - handler.accept(null); - return; - } - - ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); - clusterStateRequest.clear(); - clusterStateRequest.metaData(true); - clusterStateRequest.indices(leaderIndex.getName()); - - leaderClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> { - IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex); - assert indexMetaData.getMappings().size() == 1; - MappingMetaData mappingMetaData = indexMetaData.getMappings().iterator().next().value; - - PutMappingRequest putMappingRequest = new PutMappingRequest(followIndex.getName()); - putMappingRequest.type(mappingMetaData.type()); - putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON); - followClient.admin().indices().putMapping(putMappingRequest, ActionListener.wrap(putMappingResponse -> { - currentIndexMetadataVersion.set(indexMetaData.getVersion()); - updateMappingSemaphore.release(); - handler.accept(null); - }, e -> { - updateMappingSemaphore.release(); - handler.accept(e); - })); - }, e -> { - updateMappingSemaphore.release(); - handler.accept(e); - })); - } - } - } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsRequest.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsRequest.java index ef9d27ef919db..c28789fb580a8 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsRequest.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsRequest.java @@ -12,41 +12,44 @@ import org.elasticsearch.index.translog.Translog; import java.io.IOException; +import java.util.List; public final class BulkShardOperationsRequest extends ReplicatedWriteRequest { - private Translog.Operation[] operations; + private List operations; public BulkShardOperationsRequest() { - } - public BulkShardOperationsRequest(final ShardId shardId, final Translog.Operation[] operations) { + public BulkShardOperationsRequest(final ShardId shardId, final List operations) { super(shardId); setRefreshPolicy(RefreshPolicy.NONE); this.operations = operations; } - public Translog.Operation[] getOperations() { + public List getOperations() { return operations; } @Override public void readFrom(final StreamInput in) throws IOException { super.readFrom(in); - operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new); + operations = in.readList(Translog.Operation::readOperation); } @Override public void writeTo(final StreamOutput out) throws IOException { super.writeTo(out); - out.writeArray(Translog.Operation::writeOperation, operations); + out.writeVInt(operations.size()); + for (Translog.Operation operation : operations) { + Translog.Operation.writeOperation(out, operation); + } } @Override public String toString() { return "BulkShardOperationsRequest{" + - "operations=" + operations.length+ + "operations=" + operations.size()+ ", shardId=" + shardId + ", timeout=" + timeout + ", index='" + index + '\'' + diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsResponse.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsResponse.java index 62612e4bb4b3a..39cc85ffe71ed 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsResponse.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsResponse.java @@ -7,12 +7,39 @@ import org.elasticsearch.action.support.WriteResponse; import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; public final class BulkShardOperationsResponse extends ReplicationResponse implements WriteResponse { + private long globalCheckpoint; + + BulkShardOperationsResponse() { + } + + public long getGlobalCheckpoint() { + return globalCheckpoint; + } + + public void setGlobalCheckpoint(long globalCheckpoint) { + this.globalCheckpoint = globalCheckpoint; + } + @Override public void setForcedRefresh(final boolean forcedRefresh) { + } + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + globalCheckpoint = in.readZLong(); } + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeZLong(globalCheckpoint); + } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index 81cbf042037c1..0770c8713562e 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ccr.action.bulk; import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; @@ -24,7 +25,8 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; public class TransportBulkShardOperationsAction extends TransportWriteAction { @@ -62,10 +64,10 @@ protected WritePrimaryResult shardOperationOnPrimary( final ShardId shardId, - final Translog.Operation[] sourceOperations, + final List sourceOperations, final IndexShard primary, final Logger logger) throws IOException { - final Translog.Operation[] targetOperations = Arrays.stream(sourceOperations).map(operation -> { + final List targetOperations = sourceOperations.stream().map(operation -> { final Translog.Operation operationWithPrimaryTerm; switch (operation.opType()) { case INDEX: @@ -100,10 +102,10 @@ static WritePrimaryResult(replicaRequest, new BulkShardOperationsResponse(), location, null, primary, logger); + return new CcrWritePrimaryResult(replicaRequest, location, primary, logger); } @Override @@ -114,7 +116,7 @@ protected WriteReplicaResult shardOperationOnReplica } private static Translog.Location applyTranslogOperations( - final Translog.Operation[] operations, final IndexShard shard, final Engine.Operation.Origin origin) throws IOException { + final List operations, final IndexShard shard, final Engine.Operation.Origin origin) throws IOException { Translog.Location location = null; for (final Translog.Operation operation : operations) { final Engine.Result result = shard.applyTranslogOperation(operation, origin); @@ -122,7 +124,7 @@ private static Translog.Location applyTranslogOperations( assert result.getResultType() == Engine.Result.Type.SUCCESS; location = locationToSync(location, result.getTranslogLocation()); } - assert operations.length == 0 || location != null; + assert operations.size() == 0 || location != null; return location; } @@ -131,4 +133,23 @@ protected BulkShardOperationsResponse newResponseInstance() { return new BulkShardOperationsResponse(); } + /** + * Custom write result to include global checkpoint after ops have been replicated. + */ + static class CcrWritePrimaryResult extends WritePrimaryResult { + + CcrWritePrimaryResult(BulkShardOperationsRequest request, Translog.Location location, IndexShard primary, Logger logger) { + super(request, new BulkShardOperationsResponse(), location, null, primary, logger); + } + + @Override + public synchronized void respond(ActionListener listener) { + // Return a fresh global checkpoint after the operations have been replicated for the shard follow task: + BulkShardOperationsResponse response = finalResponseIfSuccessful; + response.setGlobalCheckpoint(primary.getGlobalCheckpoint()); + listener.onResponse(response); + } + + } + } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestCreateAndFollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestCreateAndFollowIndexAction.java index 22d3390671935..7683311f812e0 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestCreateAndFollowIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestCreateAndFollowIndexAction.java @@ -31,8 +31,7 @@ public String getName() { @Override protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { - Request request = new Request(); - request.setFollowRequest(RestFollowIndexAction.createRequest(restRequest)); + Request request = new Request(RestFollowIndexAction.createRequest(restRequest)); return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel)); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowIndexAction.java index 235308f902926..6804ce8ecb72c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowIndexAction.java @@ -7,10 +7,12 @@ import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask; import org.elasticsearch.xpack.ccr.action.ShardFollowTask; import java.io.IOException; @@ -37,19 +39,31 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient } static Request createRequest(RestRequest restRequest) { - Request request = new Request(); - request.setLeaderIndex(restRequest.param("leader_index")); - request.setFollowIndex(restRequest.param("index")); - if (restRequest.hasParam(ShardFollowTask.MAX_CHUNK_SIZE.getPreferredName())) { - request.setBatchSize(Long.valueOf(restRequest.param(ShardFollowTask.MAX_CHUNK_SIZE.getPreferredName()))); + int maxBatchOperationCount = ShardFollowNodeTask.DEFAULT_MAX_BATCH_OPERATION_COUNT; + if (restRequest.hasParam(ShardFollowTask.MAX_BATCH_OPERATION_COUNT.getPreferredName())) { + maxBatchOperationCount = Integer.valueOf(restRequest.param(ShardFollowTask.MAX_BATCH_OPERATION_COUNT.getPreferredName())); } - if (restRequest.hasParam(ShardFollowTask.NUM_CONCURRENT_CHUNKS.getPreferredName())) { - request.setConcurrentProcessors(Integer.valueOf(restRequest.param(ShardFollowTask.NUM_CONCURRENT_CHUNKS.getPreferredName()))); + int maxConcurrentReadBatches = ShardFollowNodeTask.DEFAULT_MAX_CONCURRENT_READ_BATCHES; + if (restRequest.hasParam(ShardFollowTask.MAX_CONCURRENT_READ_BATCHES.getPreferredName())) { + maxConcurrentReadBatches = Integer.valueOf(restRequest.param(ShardFollowTask.MAX_CONCURRENT_READ_BATCHES.getPreferredName())); } - if (restRequest.hasParam(ShardFollowTask.PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST.getPreferredName())) { - long value = Long.valueOf(restRequest.param(ShardFollowTask.PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST.getPreferredName())); - request.setProcessorMaxTranslogBytes(value); + long maxBatchSizeInBytes = ShardFollowNodeTask.DEFAULT_MAX_BATCH_SIZE_IN_BYTES; + if (restRequest.hasParam(ShardFollowTask.MAX_BATCH_SIZE_IN_BYTES.getPreferredName())) { + maxBatchSizeInBytes = Long.valueOf(restRequest.param(ShardFollowTask.MAX_BATCH_SIZE_IN_BYTES.getPreferredName())); } - return request; + int maxConcurrentWriteBatches = ShardFollowNodeTask.DEFAULT_MAX_CONCURRENT_WRITE_BATCHES; + if (restRequest.hasParam(ShardFollowTask.MAX_CONCURRENT_WRITE_BATCHES.getPreferredName())) { + maxConcurrentWriteBatches = Integer.valueOf(restRequest.param(ShardFollowTask.MAX_CONCURRENT_WRITE_BATCHES.getPreferredName())); + } + int maxWriteBufferSize = ShardFollowNodeTask.DEFAULT_MAX_WRITE_BUFFER_SIZE; + if (restRequest.hasParam(ShardFollowTask.MAX_WRITE_BUFFER_SIZE.getPreferredName())) { + maxWriteBufferSize = Integer.parseInt(restRequest.param(ShardFollowTask.MAX_WRITE_BUFFER_SIZE.getPreferredName())); + } + TimeValue retryTimeout = restRequest.paramAsTime(ShardFollowTask.RETRY_TIMEOUT.getPreferredName(), + ShardFollowNodeTask.DEFAULT_RETRY_TIMEOUT); + TimeValue idleShardRetryTimeout = restRequest.paramAsTime(ShardFollowTask.IDLE_SHARD_RETRY_DELAY.getPreferredName(), + ShardFollowNodeTask.DEFAULT_IDLE_SHARD_RETRY_DELAY); + return new Request(restRequest.param("leader_index"), restRequest.param("index"), maxBatchOperationCount, maxConcurrentReadBatches, + maxBatchSizeInBytes, maxConcurrentWriteBatches, maxWriteBufferSize, retryTimeout, idleShardRetryTimeout); } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java index ba9855b58736e..287a46f8eb026 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java @@ -9,13 +9,20 @@ import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.analysis.common.CommonAnalysisPlugin; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.support.XContentMapValues; @@ -24,6 +31,7 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.MockHttpTransport; @@ -37,6 +45,7 @@ import org.elasticsearch.xpack.core.XPackSettings; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -45,10 +54,14 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import static java.util.Collections.singletonMap; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -90,8 +103,8 @@ protected Collection> transportClientPlugins() { // this emulates what the CCR persistent task will do for pulling public void testGetOperationsBasedOnGlobalSequenceId() throws Exception { client().admin().indices().prepareCreate("index") - .setSettings(Settings.builder().put("index.number_of_shards", 1)) - .get(); + .setSettings(Settings.builder().put("index.number_of_shards", 1)) + .get(); client().prepareIndex("index", "doc", "1").setSource("{}", XContentType.JSON).get(); client().prepareIndex("index", "doc", "2").setSource("{}", XContentType.JSON).get(); @@ -102,8 +115,8 @@ public void testGetOperationsBasedOnGlobalSequenceId() throws Exception { assertThat(globalCheckPoint, equalTo(2L)); ShardChangesAction.Request request = new ShardChangesAction.Request(shardStats.getShardRouting().shardId()); - request.setMinSeqNo(0L); - request.setMaxSeqNo(globalCheckPoint); + request.setFromSeqNo(0L); + request.setMaxOperationCount(3); ShardChangesAction.Response response = client().execute(ShardChangesAction.INSTANCE, request).get(); assertThat(response.getOperations().length, equalTo(3)); Translog.Index operation = (Translog.Index) response.getOperations()[0]; @@ -127,8 +140,8 @@ public void testGetOperationsBasedOnGlobalSequenceId() throws Exception { assertThat(globalCheckPoint, equalTo(5L)); request = new ShardChangesAction.Request(shardStats.getShardRouting().shardId()); - request.setMinSeqNo(3L); - request.setMaxSeqNo(globalCheckPoint); + request.setFromSeqNo(3L); + request.setMaxOperationCount(3); response = client().execute(ShardChangesAction.INSTANCE, request).get(); assertThat(response.getOperations().length, equalTo(3)); operation = (Translog.Index) response.getOperations()[0]; @@ -147,16 +160,12 @@ public void testGetOperationsBasedOnGlobalSequenceId() throws Exception { public void testFollowIndex() throws Exception { final int numberOfPrimaryShards = randomIntBetween(1, 3); final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, - Collections.singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); - ensureGreen("index1"); + ensureYellow("index1"); - final FollowIndexAction.Request followRequest = new FollowIndexAction.Request(); - followRequest.setLeaderIndex("index1"); - followRequest.setFollowIndex("index2"); - - final CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(); - createAndFollowRequest.setFollowRequest(followRequest); + final FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2"); + final CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest); client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get(); final int firstBatchNumDocs = randomIntBetween(2, 64); @@ -209,16 +218,12 @@ public void testFollowIndex() throws Exception { public void testSyncMappings() throws Exception { final String leaderIndexSettings = getIndexSettings(2, - Collections.singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); - ensureGreen("index1"); - - final FollowIndexAction.Request followRequest = new FollowIndexAction.Request(); - followRequest.setLeaderIndex("index1"); - followRequest.setFollowIndex("index2"); + ensureYellow("index1"); - final CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(); - createAndFollowRequest.setFollowRequest(followRequest); + final FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2"); + final CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest); client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get(); final long firstBatchNumDocs = randomIntBetween(2, 64); @@ -229,7 +234,7 @@ public void testSyncMappings() throws Exception { assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(firstBatchNumDocs))); MappingMetaData mappingMetaData = client().admin().indices().prepareGetMappings("index2").get().getMappings() - .get("index2").get("doc"); + .get("index2").get("doc"); assertThat(XContentMapValues.extractValue("properties.f.type", mappingMetaData.sourceAsMap()), equalTo("integer")); assertThat(XContentMapValues.extractValue("properties.k", mappingMetaData.sourceAsMap()), nullValue()); @@ -240,48 +245,136 @@ public void testSyncMappings() throws Exception { } assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, - equalTo(firstBatchNumDocs + secondBatchNumDocs))); + equalTo(firstBatchNumDocs + secondBatchNumDocs))); mappingMetaData = client().admin().indices().prepareGetMappings("index2").get().getMappings() - .get("index2").get("doc"); + .get("index2").get("doc"); assertThat(XContentMapValues.extractValue("properties.f.type", mappingMetaData.sourceAsMap()), equalTo("integer")); assertThat(XContentMapValues.extractValue("properties.k.type", mappingMetaData.sourceAsMap()), equalTo("long")); + unfollowIndex("index2"); + } - final UnfollowIndexAction.Request unfollowRequest = new UnfollowIndexAction.Request(); - unfollowRequest.setFollowIndex("index2"); - client().execute(UnfollowIndexAction.INSTANCE, unfollowRequest).get(); + public void testFollowIndex_backlog() throws Exception { + String leaderIndexSettings = getIndexSettings(3, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); + BulkProcessor.Listener listener = new BulkProcessor.Listener() { + @Override + public void beforeBulk(long executionId, BulkRequest request) {} - assertBusy(() -> { - final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - assertThat(tasks.tasks().size(), equalTo(0)); + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {} - ListTasksRequest listTasksRequest = new ListTasksRequest(); - listTasksRequest.setDetailed(true); - ListTasksResponse listTasksResponse = client().admin().cluster().listTasks(listTasksRequest).get(); - int numNodeTasks = 0; - for (TaskInfo taskInfo : listTasksResponse.getTasks()) { - if (taskInfo.getAction().startsWith(ListTasksAction.NAME) == false) { - numNodeTasks++; + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) {} + }; + BulkProcessor bulkProcessor = BulkProcessor.builder(client(), listener) + .setBulkActions(100) + .setConcurrentRequests(4) + .build(); + AtomicBoolean run = new AtomicBoolean(true); + Thread thread = new Thread(() -> { + int counter = 0; + while (run.get()) { + final String source = String.format(Locale.ROOT, "{\"f\":%d}", counter++); + IndexRequest indexRequest = new IndexRequest("index1", "doc") + .source(source, XContentType.JSON) + .timeout(TimeValue.timeValueSeconds(1)); + bulkProcessor.add(indexRequest); + } + }); + thread.start(); + + // Waiting for some document being index before following the index: + int maxReadSize = randomIntBetween(128, 2048); + long numDocsIndexed = Math.min(3000 * 2, randomLongBetween(maxReadSize, maxReadSize * 10)); + atLeastDocsIndexed("index1", numDocsIndexed / 3); + + final FollowIndexAction.Request followRequest = new FollowIndexAction.Request("index1", "index2", maxReadSize, + randomIntBetween(2, 10), Long.MAX_VALUE, randomIntBetween(2, 10), + randomIntBetween(1024, 10240), TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10)); + CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest); + client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get(); + + atLeastDocsIndexed("index1", numDocsIndexed); + run.set(false); + thread.join(); + assertThat(bulkProcessor.awaitClose(1L, TimeUnit.MINUTES), is(true)); + + assertSameDocCount("index1", "index2"); + unfollowIndex("index2"); + } + + public void testFollowIndexAndCloseNode() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(3); + String leaderIndexSettings = getIndexSettings(3, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); + + String followerIndexSettings = getIndexSettings(3, singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true")); + assertAcked(client().admin().indices().prepareCreate("index2").setSource(followerIndexSettings, XContentType.JSON)); + ensureGreen("index1", "index2"); + + AtomicBoolean run = new AtomicBoolean(true); + Thread thread = new Thread(() -> { + int counter = 0; + while (run.get()) { + final String source = String.format(Locale.ROOT, "{\"f\":%d}", counter++); + try { + client().prepareIndex("index1", "doc") + .setSource(source, XContentType.JSON) + .setTimeout(TimeValue.timeValueSeconds(1)) + .get(); + } catch (Exception e) { + logger.error("Error while indexing into leader index", e); } } - assertThat(numNodeTasks, equalTo(0)); }); + thread.start(); + + final FollowIndexAction.Request followRequest = new FollowIndexAction.Request("index1", "index2", randomIntBetween(32, 2048), + randomIntBetween(2, 10), Long.MAX_VALUE, randomIntBetween(2, 10), + ShardFollowNodeTask.DEFAULT_MAX_WRITE_BUFFER_SIZE, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10)); + client().execute(FollowIndexAction.INSTANCE, followRequest).get(); + + long maxNumDocsReplicated = Math.min(3000, randomLongBetween(followRequest.getMaxBatchOperationCount(), + followRequest.getMaxBatchOperationCount() * 10)); + long minNumDocsReplicated = maxNumDocsReplicated / 3L; + logger.info("waiting for at least [{}] documents to be indexed and then stop a random data node", minNumDocsReplicated); + awaitBusy(() -> { + SearchRequest request = new SearchRequest("index2"); + request.source(new SearchSourceBuilder().size(0)); + SearchResponse response = client().search(request).actionGet(); + if (response.getHits().getTotalHits() >= minNumDocsReplicated) { + try { + internalCluster().stopRandomNonMasterNode(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return true; + } else { + return false; + } + }, 30, TimeUnit.SECONDS); + + logger.info("waiting for at least [{}] documents to be indexed", maxNumDocsReplicated); + atLeastDocsIndexed("index2", maxNumDocsReplicated); + run.set(false); + thread.join(); + + assertSameDocCount("index1", "index2"); + unfollowIndex("index2"); } public void testFollowIndexWithNestedField() throws Exception { final String leaderIndexSettings = - getIndexSettingsWithNestedMapping(1, Collections.singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + getIndexSettingsWithNestedMapping(1, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); final String followerIndexSettings = - getIndexSettingsWithNestedMapping(1, Collections.singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true")); + getIndexSettingsWithNestedMapping(1, singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true")); assertAcked(client().admin().indices().prepareCreate("index2").setSource(followerIndexSettings, XContentType.JSON)); ensureGreen("index1", "index2"); - final FollowIndexAction.Request followRequest = new FollowIndexAction.Request(); - followRequest.setLeaderIndex("index1"); - followRequest.setFollowIndex("index2"); + final FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2"); client().execute(FollowIndexAction.INSTANCE, followRequest).get(); final int numDocs = randomIntBetween(2, 64); @@ -311,16 +404,7 @@ public void testFollowIndexWithNestedField() throws Exception { equalTo(Collections.singletonList(value))); }); } - - final UnfollowIndexAction.Request unfollowRequest = new UnfollowIndexAction.Request(); - unfollowRequest.setFollowIndex("index2"); - client().execute(UnfollowIndexAction.INSTANCE, unfollowRequest).get(); - - assertBusy(() -> { - final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - assertThat(tasks.tasks().size(), equalTo(0)); - }); + unfollowIndex("index2"); } public void testUnfollowNonExistingIndex() { @@ -332,19 +416,48 @@ public void testUnfollowNonExistingIndex() { public void testFollowNonExistentIndex() throws Exception { assertAcked(client().admin().indices().prepareCreate("test-leader").get()); assertAcked(client().admin().indices().prepareCreate("test-follower").get()); - final FollowIndexAction.Request followRequest = new FollowIndexAction.Request(); // Leader index does not exist. - followRequest.setLeaderIndex("non-existent-leader"); - followRequest.setFollowIndex("test-follower"); - expectThrows(IllegalArgumentException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest).actionGet()); + FollowIndexAction.Request followRequest1 = createFollowRequest("non-existent-leader", "test-follower"); + expectThrows(IllegalArgumentException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest1).actionGet()); // Follower index does not exist. - followRequest.setLeaderIndex("test-leader"); - followRequest.setFollowIndex("non-existent-follower"); - expectThrows(IllegalArgumentException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest).actionGet()); + FollowIndexAction.Request followRequest2 = createFollowRequest("non-test-leader", "non-existent-follower"); + expectThrows(IllegalArgumentException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest2).actionGet()); // Both indices do not exist. - followRequest.setLeaderIndex("non-existent-leader"); - followRequest.setFollowIndex("non-existent-follower"); - expectThrows(IllegalArgumentException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest).actionGet()); + FollowIndexAction.Request followRequest3 = createFollowRequest("non-existent-leader", "non-existent-follower"); + expectThrows(IllegalArgumentException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest3).actionGet()); + } + + public void testFollowIndex_lowMaxTranslogBytes() throws Exception { + final String leaderIndexSettings = getIndexSettings(1, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); + ensureYellow("index1"); + + final int numDocs = 1024; + logger.info("Indexing [{}] docs", numDocs); + for (int i = 0; i < numDocs; i++) { + final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); + client().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); + } + + final FollowIndexAction.Request followRequest = new FollowIndexAction.Request("index1", "index2", 1024, 1, 1024, + 1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10)); + final CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest); + client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get(); + + final Map firstBatchNumDocsPerShard = new HashMap<>(); + final ShardStats[] firstBatchShardStats = client().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); + for (final ShardStats shardStats : firstBatchShardStats) { + if (shardStats.getShardRouting().primary()) { + long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1; + firstBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value); + } + } + + assertBusy(assertTask(1, firstBatchNumDocsPerShard)); + for (int i = 0; i < numDocs; i++) { + assertBusy(assertExpectedDocumentRunnable(i)); + } + unfollowIndex("index2"); } private CheckedRunnable assertTask(final int numberOfPrimaryShards, final Map numDocsPerShard) { @@ -362,7 +475,7 @@ private CheckedRunnable assertTask(final int numberOfPrimaryShards, f List taskInfos = listTasksResponse.getTasks(); assertThat(taskInfos.size(), equalTo(numberOfPrimaryShards)); Collection> shardFollowTasks = - taskMetadata.findTasks(ShardFollowTask.NAME, Objects::nonNull); + taskMetadata.findTasks(ShardFollowTask.NAME, Objects::nonNull); for (PersistentTasksCustomMetaData.PersistentTask shardFollowTask : shardFollowTasks) { final ShardFollowTask shardFollowTaskParams = (ShardFollowTask) shardFollowTask.getParams(); TaskInfo taskInfo = null; @@ -376,9 +489,9 @@ private CheckedRunnable assertTask(final int numberOfPrimaryShards, f assertThat(taskInfo, notNullValue()); ShardFollowNodeTask.Status status = (ShardFollowNodeTask.Status) taskInfo.getStatus(); assertThat(status, notNullValue()); - assertThat( - status.getProcessedGlobalCheckpoint(), - equalTo(numDocsPerShard.get(shardFollowTaskParams.getLeaderShardId()))); + assertThat("incorrect global checkpoint " + shardFollowTaskParams, + status.getFollowerGlobalCheckpoint(), + equalTo(numDocsPerShard.get(shardFollowTaskParams.getLeaderShardId()))); } }; } @@ -422,6 +535,7 @@ private String getIndexSettings(final int numberOfPrimaryShards, final Map additionalSetting : additionalIndexSettings.entrySet()) { builder.field(additionalSetting.getKey(), additionalSetting.getValue()); } @@ -502,4 +616,36 @@ private String getIndexSettingsWithNestedMapping(final int numberOfPrimaryShards } return settings; } + + private void atLeastDocsIndexed(String index, long numDocsReplicated) throws InterruptedException { + logger.info("waiting for at least [{}] documents to be indexed into index [{}]", numDocsReplicated, index); + awaitBusy(() -> { + refresh(index); + SearchRequest request = new SearchRequest(index); + request.source(new SearchSourceBuilder().size(0)); + SearchResponse response = client().search(request).actionGet(); + return response.getHits().getTotalHits() >= numDocsReplicated; + }, 30, TimeUnit.SECONDS); + } + + private void assertSameDocCount(String index1, String index2) throws Exception { + refresh(index1); + SearchRequest request1 = new SearchRequest(index1); + request1.source(new SearchSourceBuilder().size(0)); + SearchResponse response1 = client().search(request1).actionGet(); + assertBusy(() -> { + refresh(index2); + SearchRequest request2 = new SearchRequest(index2); + request2.source(new SearchSourceBuilder().size(0)); + SearchResponse response2 = client().search(request2).actionGet(); + assertThat(response2.getHits().getTotalHits(), equalTo(response1.getHits().getTotalHits())); + }); + } + + public static FollowIndexAction.Request createFollowRequest(String leaderIndex, String followIndex) { + return new FollowIndexAction.Request(leaderIndex, followIndex, ShardFollowNodeTask.DEFAULT_MAX_BATCH_OPERATION_COUNT, + ShardFollowNodeTask.DEFAULT_MAX_CONCURRENT_READ_BATCHES, ShardFollowNodeTask.DEFAULT_MAX_BATCH_SIZE_IN_BYTES, + ShardFollowNodeTask.DEFAULT_MAX_CONCURRENT_WRITE_BATCHES, ShardFollowNodeTask.DEFAULT_MAX_WRITE_BUFFER_SIZE, + TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(10)); + } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java deleted file mode 100644 index f5104c537f652..0000000000000 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java +++ /dev/null @@ -1,409 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ccr.action; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.client.AdminClient; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.ClusterAdminClient; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.translog.Translog; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor.ChunkProcessor; -import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor.ChunksCoordinator; -import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor.IndexMetadataVersionChecker; -import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction; -import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest; -import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; - -import java.net.ConnectException; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; -import java.util.function.Consumer; - -import static org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor.PROCESSOR_RETRY_LIMIT; -import static org.hamcrest.CoreMatchers.notNullValue; -import static org.hamcrest.CoreMatchers.nullValue; -import static org.hamcrest.CoreMatchers.sameInstance; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.same; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -public class ChunksCoordinatorTests extends ESTestCase { - - public void testCreateChunks() { - Client client = mock(Client.class); - Executor ccrExecutor = Runnable::run; - ShardId leaderShardId = new ShardId("index1", "index1", 0); - ShardId followShardId = new ShardId("index2", "index1", 0); - - IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(), - followShardId.getIndex(), client, client); - ChunksCoordinator coordinator = new ChunksCoordinator(client, client, ccrExecutor, checker, 1024, 1, - Long.MAX_VALUE, leaderShardId, followShardId, e -> {}); - coordinator.createChucks(0, 1023); - List result = new ArrayList<>(coordinator.getChunks()); - assertThat(result.size(), equalTo(1)); - assertThat(result.get(0)[0], equalTo(0L)); - assertThat(result.get(0)[1], equalTo(1023L)); - - coordinator.getChunks().clear(); - coordinator.createChucks(0, 2047); - result = new ArrayList<>(coordinator.getChunks()); - assertThat(result.size(), equalTo(2)); - assertThat(result.get(0)[0], equalTo(0L)); - assertThat(result.get(0)[1], equalTo(1023L)); - assertThat(result.get(1)[0], equalTo(1024L)); - assertThat(result.get(1)[1], equalTo(2047L)); - - coordinator.getChunks().clear(); - coordinator.createChucks(0, 4095); - result = new ArrayList<>(coordinator.getChunks()); - assertThat(result.size(), equalTo(4)); - assertThat(result.get(0)[0], equalTo(0L)); - assertThat(result.get(0)[1], equalTo(1023L)); - assertThat(result.get(1)[0], equalTo(1024L)); - assertThat(result.get(1)[1], equalTo(2047L)); - assertThat(result.get(2)[0], equalTo(2048L)); - assertThat(result.get(2)[1], equalTo(3071L)); - assertThat(result.get(3)[0], equalTo(3072L)); - assertThat(result.get(3)[1], equalTo(4095L)); - - coordinator.getChunks().clear(); - coordinator.createChucks(4096, 8196); - result = new ArrayList<>(coordinator.getChunks()); - assertThat(result.size(), equalTo(5)); - assertThat(result.get(0)[0], equalTo(4096L)); - assertThat(result.get(0)[1], equalTo(5119L)); - assertThat(result.get(1)[0], equalTo(5120L)); - assertThat(result.get(1)[1], equalTo(6143L)); - assertThat(result.get(2)[0], equalTo(6144L)); - assertThat(result.get(2)[1], equalTo(7167L)); - assertThat(result.get(3)[0], equalTo(7168L)); - assertThat(result.get(3)[1], equalTo(8191L)); - assertThat(result.get(4)[0], equalTo(8192L)); - assertThat(result.get(4)[1], equalTo(8196L)); - } - - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/31581") - public void testCoordinator() throws Exception { - Client client = createClientMock(); - - mockShardChangesApiCall(client); - mockBulkShardOperationsApiCall(client); - Executor ccrExecutor = Runnable::run; - ShardId leaderShardId = new ShardId("index1", "index1", 0); - ShardId followShardId = new ShardId("index2", "index1", 0); - - Consumer handler = e -> assertThat(e, nullValue()); - int concurrentProcessors = randomIntBetween(1, 4); - int batchSize = randomIntBetween(1, 1000); - IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(), - followShardId.getIndex(), client, client); - ChunksCoordinator coordinator = new ChunksCoordinator(client, client, ccrExecutor, checker, batchSize, - concurrentProcessors, Long.MAX_VALUE, leaderShardId, followShardId, handler); - - int numberOfOps = randomIntBetween(batchSize, batchSize * 20); - long from = randomInt(1000); - long to = from + numberOfOps - 1; - coordinator.createChucks(from, to); - int expectedNumberOfChunks = numberOfOps / batchSize; - if (numberOfOps % batchSize > 0) { - expectedNumberOfChunks++; - } - assertThat(coordinator.getChunks().size(), equalTo(expectedNumberOfChunks)); - - coordinator.start(); - assertThat(coordinator.getChunks().size(), equalTo(0)); - verify(client, times(expectedNumberOfChunks)).execute(same(ShardChangesAction.INSTANCE), - any(ShardChangesAction.Request.class), any()); - verify(client, times(expectedNumberOfChunks)).execute(same(BulkShardOperationsAction.INSTANCE), - any(BulkShardOperationsRequest.class), any()); - } - - public void testCoordinator_failure() throws Exception { - Exception expectedException = new RuntimeException("throw me"); - Client client = createClientMock(); - boolean shardChangesActionApiCallFailed; - if (randomBoolean()) { - shardChangesActionApiCallFailed = true; - doThrow(expectedException).when(client).execute(same(ShardChangesAction.INSTANCE), - any(ShardChangesAction.Request.class), any()); - } else { - shardChangesActionApiCallFailed = false; - mockShardChangesApiCall(client); - doThrow(expectedException).when(client).execute(same(BulkShardOperationsAction.INSTANCE), - any(BulkShardOperationsRequest.class), any()); - } - Executor ccrExecutor = Runnable::run; - ShardId leaderShardId = new ShardId("index1", "index1", 0); - ShardId followShardId = new ShardId("index2", "index1", 0); - - Consumer handler = e -> { - assertThat(e, notNullValue()); - assertThat(e, sameInstance(expectedException)); - }; - IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(), - followShardId.getIndex(), client, client); - ChunksCoordinator coordinator = new ChunksCoordinator(client, client, ccrExecutor, checker, 10, 1, Long.MAX_VALUE, - leaderShardId, followShardId, handler); - coordinator.createChucks(0, 19); - assertThat(coordinator.getChunks().size(), equalTo(2)); - - coordinator.start(); - assertThat(coordinator.getChunks().size(), equalTo(1)); - verify(client, times(1)).execute(same(ShardChangesAction.INSTANCE), any(ShardChangesAction.Request.class), - any()); - verify(client, times(shardChangesActionApiCallFailed ? 0 : 1)).execute(same(BulkShardOperationsAction.INSTANCE), - any(BulkShardOperationsRequest.class), any()); - } - - public void testCoordinator_concurrent() throws Exception { - Client client = createClientMock(); - mockShardChangesApiCall(client); - mockBulkShardOperationsApiCall(client); - Executor ccrExecutor = command -> new Thread(command).start(); - ShardId leaderShardId = new ShardId("index1", "index1", 0); - ShardId followShardId = new ShardId("index2", "index1", 0); - - AtomicBoolean calledOnceChecker = new AtomicBoolean(false); - AtomicReference failureHolder = new AtomicReference<>(); - CountDownLatch latch = new CountDownLatch(1); - Consumer handler = e -> { - if (failureHolder.compareAndSet(null, e) == false) { - // This handler should only be called once, irregardless of the number of concurrent processors - calledOnceChecker.set(true); - } - latch.countDown(); - }; - IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(), - followShardId.getIndex(), client, client); - ChunksCoordinator coordinator = new ChunksCoordinator(client, client, ccrExecutor, checker, 1000, 4, Long.MAX_VALUE, - leaderShardId, followShardId, handler); - coordinator.createChucks(0, 999999); - assertThat(coordinator.getChunks().size(), equalTo(1000)); - - coordinator.start(); - latch.await(); - assertThat(coordinator.getChunks().size(), equalTo(0)); - verify(client, times(1000)).execute(same(ShardChangesAction.INSTANCE), any(ShardChangesAction.Request.class), any()); - verify(client, times(1000)).execute(same(BulkShardOperationsAction.INSTANCE), any(BulkShardOperationsRequest.class), any()); - assertThat(calledOnceChecker.get(), is(false)); - } - - public void testChunkProcessor() { - Client client = createClientMock(); - Queue chunks = new LinkedList<>(); - mockShardChangesApiCall(client); - mockBulkShardOperationsApiCall(client); - Executor ccrExecutor = Runnable::run; - ShardId leaderShardId = new ShardId("index1", "index1", 0); - ShardId followShardId = new ShardId("index2", "index1", 0); - IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(), - followShardId.getIndex(), client, client); - - boolean[] invoked = new boolean[1]; - Exception[] exception = new Exception[1]; - Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; - ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, checker, leaderShardId, - followShardId, handler); - chunkProcessor.start(0, 10, Long.MAX_VALUE); - assertThat(invoked[0], is(true)); - assertThat(exception[0], nullValue()); - } - - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/31581") - public void testChunkProcessorRetry() { - Client client = createClientMock(); - Queue chunks = new LinkedList<>(); - mockBulkShardOperationsApiCall(client); - int testRetryLimit = randomIntBetween(1, PROCESSOR_RETRY_LIMIT - 1); - mockShardCangesApiCallWithRetry(client, testRetryLimit, new ConnectException("connection exception")); - - Executor ccrExecutor = Runnable::run; - ShardId leaderShardId = new ShardId("index1", "index1", 0); - ShardId followShardId = new ShardId("index2", "index1", 0); - IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(), - followShardId.getIndex(), client, client); - - boolean[] invoked = new boolean[1]; - Exception[] exception = new Exception[1]; - Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; - ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, checker, leaderShardId, - followShardId, handler); - chunkProcessor.start(0, 10, Long.MAX_VALUE); - assertThat(invoked[0], is(true)); - assertThat(exception[0], nullValue()); - assertThat(chunkProcessor.retryCounter.get(), equalTo(testRetryLimit + 1)); - } - - public void testChunkProcessorRetryTooManyTimes() { - Client client = createClientMock(); - Queue chunks = new LinkedList<>(); - mockBulkShardOperationsApiCall(client); - int testRetryLimit = PROCESSOR_RETRY_LIMIT + 1; - mockShardCangesApiCallWithRetry(client, testRetryLimit, new ConnectException("connection exception")); - - Executor ccrExecutor = Runnable::run; - ShardId leaderShardId = new ShardId("index1", "index1", 0); - ShardId followShardId = new ShardId("index2", "index1", 0); - IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(), - followShardId.getIndex(), client, client); - - boolean[] invoked = new boolean[1]; - Exception[] exception = new Exception[1]; - Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; - ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, checker, leaderShardId, - followShardId, handler); - chunkProcessor.start(0, 10, Long.MAX_VALUE); - assertThat(invoked[0], is(true)); - assertThat(exception[0], notNullValue()); - assertThat(exception[0].getMessage(), equalTo("retrying failed [17] times, aborting...")); - assertThat(exception[0].getCause().getMessage(), equalTo("connection exception")); - assertThat(chunkProcessor.retryCounter.get(), equalTo(testRetryLimit)); - } - - public void testChunkProcessorNoneRetryableError() { - Client client = createClientMock(); - Queue chunks = new LinkedList<>(); - mockBulkShardOperationsApiCall(client); - mockShardCangesApiCallWithRetry(client, 3, new RuntimeException("unexpected")); - - Executor ccrExecutor = Runnable::run; - ShardId leaderShardId = new ShardId("index1", "index1", 0); - ShardId followShardId = new ShardId("index2", "index1", 0); - IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(), - followShardId.getIndex(), client, client); - - boolean[] invoked = new boolean[1]; - Exception[] exception = new Exception[1]; - Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; - ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, checker, leaderShardId, - followShardId, handler); - chunkProcessor.start(0, 10, Long.MAX_VALUE); - assertThat(invoked[0], is(true)); - assertThat(exception[0], notNullValue()); - assertThat(exception[0].getMessage(), equalTo("unexpected")); - assertThat(chunkProcessor.retryCounter.get(), equalTo(0)); - } - - public void testChunkProcessorExceedMaxTranslogsBytes() { - long from = 0; - long to = 20; - long actualTo = 10; - Client client = createClientMock(); - Queue chunks = new LinkedList<>(); - doAnswer(invocation -> { - Object[] args = invocation.getArguments(); - assert args.length == 3; - @SuppressWarnings("unchecked") - ActionListener listener = (ActionListener) args[2]; - - List operations = new ArrayList<>(); - for (int i = 0; i <= actualTo; i++) { - operations.add(new Translog.NoOp(i, 1, "test")); - } - listener.onResponse(new ShardChangesAction.Response(1L, operations.toArray(new Translog.Operation[0]))); - return null; - }).when(client).execute(same(ShardChangesAction.INSTANCE), any(ShardChangesAction.Request.class), any()); - - mockBulkShardOperationsApiCall(client); - Executor ccrExecutor = Runnable::run; - ShardId leaderShardId = new ShardId("index1", "index1", 0); - ShardId followShardId = new ShardId("index2", "index1", 0); - IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(), - followShardId.getIndex(), client, client); - - boolean[] invoked = new boolean[1]; - Exception[] exception = new Exception[1]; - Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; - BiConsumer> versionChecker = (indexVersiuon, consumer) -> consumer.accept(null); - ChunkProcessor chunkProcessor = - new ChunkProcessor(client, client, chunks, ccrExecutor, versionChecker, leaderShardId, followShardId, handler); - chunkProcessor.start(from, to, Long.MAX_VALUE); - assertThat(invoked[0], is(true)); - assertThat(exception[0], nullValue()); - assertThat(chunks.size(), equalTo(1)); - assertThat(chunks.peek()[0], equalTo(11L)); - assertThat(chunks.peek()[1], equalTo(20L)); - } - - private Client createClientMock() { - Client client = mock(Client.class); - ClusterAdminClient clusterAdminClient = mock(ClusterAdminClient.class); - AdminClient adminClient = mock(AdminClient.class); - when(adminClient.cluster()).thenReturn(clusterAdminClient); - when(client.admin()).thenReturn(adminClient); - return client; - } - - private void mockShardCangesApiCallWithRetry(Client client, int testRetryLimit, Exception e) { - int[] retryCounter = new int[1]; - doAnswer(invocation -> { - Object[] args = invocation.getArguments(); - assert args.length == 3; - ShardChangesAction.Request request = (ShardChangesAction.Request) args[1]; - @SuppressWarnings("unchecked") - ActionListener listener = (ActionListener) args[2]; - if (retryCounter[0]++ <= testRetryLimit) { - listener.onFailure(e); - } else { - long delta = request.getMaxSeqNo() - request.getMinSeqNo(); - Translog.Operation[] operations = new Translog.Operation[(int) delta]; - for (int i = 0; i < operations.length; i++) { - operations[i] = new Translog.NoOp(request.getMinSeqNo() + i, 1, "test"); - } - ShardChangesAction.Response response = new ShardChangesAction.Response(0L, operations); - listener.onResponse(response); - } - return null; - }).when(client).execute(same(ShardChangesAction.INSTANCE), any(ShardChangesAction.Request.class), any()); - } - - private void mockShardChangesApiCall(Client client) { - doAnswer(invocation -> { - Object[] args = invocation.getArguments(); - assert args.length == 3; - ShardChangesAction.Request request = (ShardChangesAction.Request) args[1]; - @SuppressWarnings("unchecked") - ActionListener listener = (ActionListener) args[2]; - - List operations = new ArrayList<>(); - for (long i = request.getMinSeqNo(); i <= request.getMaxSeqNo(); i++) { - operations.add(new Translog.NoOp(request.getMinSeqNo() + i, 1, "test")); - } - ShardChangesAction.Response response = new ShardChangesAction.Response(0L, operations.toArray(new Translog.Operation[0])); - listener.onResponse(response); - return null; - }).when(client).execute(same(ShardChangesAction.INSTANCE), any(ShardChangesAction.Request.class), any()); - } - - private void mockBulkShardOperationsApiCall(Client client) { - doAnswer(invocation -> { - Object[] args = invocation.getArguments(); - assert args.length == 3; - @SuppressWarnings("unchecked") - ActionListener listener = (ActionListener) args[2]; - listener.onResponse(new BulkShardOperationsResponse()); - return null; - }).when(client).execute(same(BulkShardOperationsAction.INSTANCE), any(BulkShardOperationsRequest.class), any()); - } - -} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexRequestTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexRequestTests.java index 18900bc852da5..c68d18499658c 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexRequestTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexRequestTests.java @@ -16,8 +16,6 @@ protected CreateAndFollowIndexAction.Request createBlankInstance() { @Override protected CreateAndFollowIndexAction.Request createTestInstance() { - CreateAndFollowIndexAction.Request request = new CreateAndFollowIndexAction.Request(); - request.setFollowRequest(FollowIndexRequestTests.createTestRequest()); - return request; + return new CreateAndFollowIndexAction.Request(FollowIndexRequestTests.createTestRequest()); } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java index a27294ccf2df5..31a6eb31c81b2 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexActionTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.index.MapperTestUtils; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ccr.ShardChangesIT; import java.io.IOException; @@ -22,10 +23,7 @@ public class FollowIndexActionTests extends ESTestCase { public void testValidation() throws IOException { - FollowIndexAction.Request request = new FollowIndexAction.Request(); - request.setLeaderIndex("index1"); - request.setFollowIndex("index2"); - + FollowIndexAction.Request request = ShardChangesIT.createFollowRequest("index1", "index2"); { // should fail, because leader index does not exist Exception e = expectThrows(IllegalArgumentException.class, () -> FollowIndexAction.validate(request, null, null, null)); @@ -130,7 +128,7 @@ public void testValidation() throws IOException { FollowIndexAction.validate(request, leaderIMD, followIMD, mapperService); } } - + private static IndexMetaData createIMD(String index, int numShards, Tuple... settings) throws IOException { return createIMD(index, State.OPEN, "{\"properties\": {}}", numShards, settings); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexRequestTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexRequestTests.java index 9a0557b369a77..ac5bc9b74626e 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexRequestTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexRequestTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ccr.action; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.AbstractStreamableTestCase; public class FollowIndexRequestTests extends AbstractStreamableTestCase { @@ -20,12 +21,8 @@ protected FollowIndexAction.Request createTestInstance() { } static FollowIndexAction.Request createTestRequest() { - FollowIndexAction.Request request = new FollowIndexAction.Request(); - request.setLeaderIndex(randomAlphaOfLength(4)); - request.setFollowIndex(randomAlphaOfLength(4)); - request.setBatchSize(randomNonNegativeLong()); - request.setConcurrentProcessors(randomIntBetween(0, Integer.MAX_VALUE)); - request.setProcessorMaxTranslogBytes(randomNonNegativeLong()); - return request; + return new FollowIndexAction.Request(randomAlphaOfLength(4), randomAlphaOfLength(4), randomIntBetween(1, Integer.MAX_VALUE), + randomIntBetween(1, Integer.MAX_VALUE), randomNonNegativeLong(), randomIntBetween(1, Integer.MAX_VALUE), + randomIntBetween(1, Integer.MAX_VALUE), TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(500)); } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java index 7e06ce120a86a..adbd738725a7b 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java @@ -5,12 +5,9 @@ */ package org.elasticsearch.xpack.ccr.action; -import org.elasticsearch.Version; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; -import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexService; @@ -25,18 +22,16 @@ import java.util.stream.Collectors; import java.util.stream.LongStream; -import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; public class ShardChangesActionTests extends ESSingleNodeTestCase { - public void testGetOperationsBetween() throws Exception { + public void testGetOperations() throws Exception { final Settings settings = Settings.builder() .put("index.number_of_shards", 1) .put("index.number_of_replicas", 0) .build(); final IndexService indexService = createIndex("index", settings); - IndexMetaData indexMetaData = indexService.getMetaData(); final int numWrites = randomIntBetween(2, 8192); for (int i = 0; i < numWrites; i++) { @@ -49,42 +44,36 @@ public void testGetOperationsBetween() throws Exception { for (int iter = 0; iter < iters; iter++) { int min = randomIntBetween(0, numWrites - 1); int max = randomIntBetween(min, numWrites - 1); - final Translog.Operation[] operations = ShardChangesAction.getOperationsBetween(indexShard, min, max, Long.MAX_VALUE); + int size = max - min; + final Translog.Operation[] operations = ShardChangesAction.getOperations(indexShard, + indexShard.getGlobalCheckpoint(), min, size, Long.MAX_VALUE); final List seenSeqNos = Arrays.stream(operations).map(Translog.Operation::seqNo).collect(Collectors.toList()); final List expectedSeqNos = LongStream.range(min, max + 1).boxed().collect(Collectors.toList()); assertThat(seenSeqNos, equalTo(expectedSeqNos)); } + // get operations for a range no operations exists: - Exception e = expectThrows(IllegalStateException.class, - () -> ShardChangesAction.getOperationsBetween(indexShard, numWrites, numWrites + 1, Long.MAX_VALUE)); - assertThat(e.getMessage(), containsString("Not all operations between min_seqno [" + numWrites + "] and max_seqno [" + - (numWrites + 1) +"] found")); + Translog.Operation[] operations = ShardChangesAction.getOperations(indexShard, indexShard.getGlobalCheckpoint(), + numWrites, numWrites + 1, Long.MAX_VALUE); + assertThat(operations.length, equalTo(0)); // get operations for a range some operations do not exist: - e = expectThrows(IllegalStateException.class, - () -> ShardChangesAction.getOperationsBetween(indexShard, numWrites - 10, numWrites + 10, Long.MAX_VALUE)); - assertThat(e.getMessage(), containsString("Not all operations between min_seqno [" + (numWrites - 10) + "] and max_seqno [" + - (numWrites + 10) +"] found")); + operations = ShardChangesAction.getOperations(indexShard, indexShard.getGlobalCheckpoint(), + numWrites - 10, numWrites + 10, Long.MAX_VALUE); + assertThat(operations.length, equalTo(10)); } - public void testGetOperationsBetweenWhenShardNotStarted() throws Exception { - IndexMetaData indexMetaData = IndexMetaData.builder("index") - .settings(Settings.builder() - .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID()) - .build()) - .build(); + public void testGetOperationsWhenShardNotStarted() throws Exception { IndexShard indexShard = Mockito.mock(IndexShard.class); ShardRouting shardRouting = TestShardRouting.newShardRouting("index", 0, "_node_id", true, ShardRoutingState.INITIALIZING); Mockito.when(indexShard.routingEntry()).thenReturn(shardRouting); - expectThrows(IndexShardNotStartedException.class, () -> ShardChangesAction.getOperationsBetween(indexShard, 0, 1, Long.MAX_VALUE)); + expectThrows(IndexShardNotStartedException.class, () -> ShardChangesAction.getOperations(indexShard, + indexShard.getGlobalCheckpoint(), 0, 1, Long.MAX_VALUE)); } - public void testGetOperationsBetweenExceedByteLimit() throws Exception { + public void testGetOperationsExceedByteLimit() throws Exception { final Settings settings = Settings.builder() .put("index.number_of_shards", 1) .put("index.number_of_replicas", 0) @@ -97,7 +86,8 @@ public void testGetOperationsBetweenExceedByteLimit() throws Exception { } final IndexShard indexShard = indexService.getShard(0); - final Translog.Operation[] operations = ShardChangesAction.getOperationsBetween(indexShard, 0, numWrites - 1, 256); + final Translog.Operation[] operations = ShardChangesAction.getOperations(indexShard, indexShard.getGlobalCheckpoint(), + 0, 12, 256); assertThat(operations.length, equalTo(12)); assertThat(operations[0].seqNo(), equalTo(0L)); assertThat(operations[1].seqNo(), equalTo(1L)); @@ -113,4 +103,20 @@ public void testGetOperationsBetweenExceedByteLimit() throws Exception { assertThat(operations[11].seqNo(), equalTo(11L)); } + public void testGetOperationsAlwaysReturnAtLeastOneOp() throws Exception { + final Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .build(); + final IndexService indexService = createIndex("index", settings); + + client().prepareIndex("index", "doc", "0").setSource("{}", XContentType.JSON).get(); + + final IndexShard indexShard = indexService.getShard(0); + final Translog.Operation[] operations = + ShardChangesAction.getOperations(indexShard, indexShard.getGlobalCheckpoint(), 0, 1, 0); + assertThat(operations.length, equalTo(1)); + assertThat(operations[0].seqNo(), equalTo(0L)); + } + } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesRequestTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesRequestTests.java index 3f30545576046..19585da8851d6 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesRequestTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesRequestTests.java @@ -16,8 +16,8 @@ public class ShardChangesRequestTests extends AbstractStreamableTestCase failureHolder = new AtomicReference<>(); + + public void testDefaults() throws Exception { + long followGlobalCheckpoint = randomIntBetween(-1, 2048); + task = createShardFollowTask(ShardFollowNodeTask.DEFAULT_MAX_BATCH_OPERATION_COUNT, + ShardFollowNodeTask.DEFAULT_MAX_CONCURRENT_READ_BATCHES, ShardFollowNodeTask.DEFAULT_MAX_CONCURRENT_WRITE_BATCHES, + 10000, ShardFollowNodeTask.DEFAULT_MAX_WRITE_BUFFER_SIZE, followGlobalCheckpoint); + task.start(followGlobalCheckpoint); + + assertBusy(() -> { + assertThat(task.getStatus().getFollowerGlobalCheckpoint(), equalTo(10000L)); + }); + assertThat(mappingUpdateCounter.get(), equalTo(1)); + } + + public void testHitBufferLimit() throws Exception { + // Setting buffer limit to 100, so that we are sure the limit will be met + task = createShardFollowTask(ShardFollowNodeTask.DEFAULT_MAX_BATCH_OPERATION_COUNT, 3, 1, 10000, 100, -1); + task.start(-1); + + assertBusy(() -> { + assertThat(task.getStatus().getFollowerGlobalCheckpoint(), equalTo(10000L)); + }); + } + + public void testConcurrentReadsAndWrites() throws Exception { + long followGlobalCheckpoint = randomIntBetween(-1, 2048); + task = createShardFollowTask(randomIntBetween(32, 2048), randomIntBetween(2, 10), + randomIntBetween(2, 10), 50000, 10240, followGlobalCheckpoint); + task.start(followGlobalCheckpoint); + + assertBusy(() -> { + assertThat(task.getStatus().getFollowerGlobalCheckpoint(), equalTo(50000L)); + }); + } + + public void testMappingUpdate() throws Exception { + task = createShardFollowTask(1024, 1, 1, 1000, 1024, -1); + task.start(-1); + + assertBusy(() -> { + assertThat(task.getStatus().getFollowerGlobalCheckpoint(), greaterThanOrEqualTo(1000L)); + }); + imdVersion.set(2L); + leaderGlobalCheckPoint.set(10000L); + assertBusy(() -> { + assertThat(task.getStatus().getFollowerGlobalCheckpoint(), equalTo(10000L)); + }); + assertThat(mappingUpdateCounter.get(), equalTo(2)); + } + + public void testOccasionalApiFailure() throws Exception { + task = createShardFollowTask(1024, 1, 1, 10000, 1024, -1); + task.start(-1); + randomlyFailWithRetryableError.set(true); + assertBusy(() -> { + assertThat(task.getStatus().getFollowerGlobalCheckpoint(), equalTo(10000L)); + }); + assertThat(failedRequests.get(), greaterThan(0)); + } + + public void testNotAllExpectedOpsReturned() throws Exception { + task = createShardFollowTask(1024, 1, 1, 10000, 1024, -1); + task.start(-1); + randomlyTruncateRequests.set(true); + assertBusy(() -> { + assertThat(task.getStatus().getFollowerGlobalCheckpoint(), equalTo(10000L)); + }); + assertThat(truncatedRequests.get(), greaterThan(0)); + } + + ShardFollowNodeTask createShardFollowTask(int maxBatchOperationCount, int maxConcurrentReadBatches, int maxConcurrentWriteBathces, + int globalCheckpoint, int bufferWriteLimit, long followGlobalCheckpoint) { + leaderGlobalCheckPoint = new AtomicLong(globalCheckpoint); + imdVersion = new AtomicLong(1L); + mappingUpdateCounter = new AtomicInteger(0); + randomlyTruncateRequests = new AtomicBoolean(false); + truncatedRequests = new AtomicInteger(); + randomlyFailWithRetryableError = new AtomicBoolean(false); + failedRequests = new AtomicInteger(0); + AtomicBoolean stopped = new AtomicBoolean(false); + ShardFollowTask params = new ShardFollowTask(null, new ShardId("follow_index", "", 0), + new ShardId("leader_index", "", 0), maxBatchOperationCount, maxConcurrentReadBatches, + ShardFollowNodeTask.DEFAULT_MAX_BATCH_SIZE_IN_BYTES, maxConcurrentWriteBathces, bufferWriteLimit, + TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10), Collections.emptyMap()); + + BiConsumer scheduler = (delay, task) -> { + try { + Thread.sleep(delay.millis()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + Thread thread = new Thread(task); + thread.start(); + }; + AtomicInteger readCounter = new AtomicInteger(); + AtomicInteger writeCounter = new AtomicInteger(); + LocalCheckpointTracker tracker = new LocalCheckpointTracker(followGlobalCheckpoint, followGlobalCheckpoint); + return new ShardFollowNodeTask(1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler, + TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(500)) { + + @Override + protected void updateMapping(LongConsumer handler) { + mappingUpdateCounter.incrementAndGet(); + handler.accept(imdVersion.get()); + } + + @Override + protected void innerSendBulkShardOperationsRequest(List operations, LongConsumer handler, + Consumer errorHandler) { + if (randomlyFailWithRetryableError.get() && readCounter.incrementAndGet() % 5 == 0) { + failedRequests.incrementAndGet(); + errorHandler.accept(new UnavailableShardsException(params.getFollowShardId(), "test error")); + return; + } + + for(Translog.Operation op : operations) { + tracker.markSeqNoAsCompleted(op.seqNo()); + } + + // Emulate network thread and avoid SO: + Thread thread = new Thread(() -> handler.accept(tracker.getCheckpoint())); + thread.start(); + } + + @Override + protected void innerSendShardChangesRequest(long from, int maxOperationCount, Consumer handler, + Consumer errorHandler) { + if (randomlyFailWithRetryableError.get() && writeCounter.incrementAndGet() % 5 == 0) { + failedRequests.incrementAndGet(); + errorHandler.accept(new UnavailableShardsException(params.getFollowShardId(), "test error")); + return; + } + + if (from < 0) { + errorHandler.accept(new IllegalArgumentException()); + return; + } + + ShardChangesAction.Response response; + if (from > leaderGlobalCheckPoint.get()) { + response = new ShardChangesAction.Response(imdVersion.get(), leaderGlobalCheckPoint.get(), new Translog.Operation[0]); + } else { + if (randomlyTruncateRequests.get() && maxOperationCount > 10 && truncatedRequests.get() < 5) { + truncatedRequests.incrementAndGet(); + maxOperationCount = maxOperationCount / 2; + } + List ops = new ArrayList<>(); + long maxSeqNo = Math.min(from + maxOperationCount, leaderGlobalCheckPoint.get()); + for (long seqNo = from; seqNo <= maxSeqNo; seqNo++) { + String id = UUIDs.randomBase64UUID(); + byte[] source = "{}".getBytes(StandardCharsets.UTF_8); + ops.add(new Translog.Index("doc", id, seqNo, 0, source)); + } + response = new ShardChangesAction.Response(imdVersion.get(), leaderGlobalCheckPoint.get(), + ops.toArray(new Translog.Operation[0])); + } + // Emulate network thread and avoid SO: + Thread thread = new Thread(() -> handler.accept(response)); + thread.start(); + } + + @Override + protected boolean isStopped() { + return stopped.get(); + } + + @Override + public void markAsCompleted() { + stopped.set(true); + } + + @Override + public void markAsFailed(Exception e) { + stopped.set(true); + failureHolder.set(e); + } + }; + } + + @After + public void cancelNodeTask() throws Exception { + if (task != null){ + task.markAsCompleted(); + assertThat(failureHolder.get(), nullValue()); + assertBusy(() -> { + ShardFollowNodeTask.Status status = task.getStatus(); + assertThat(status.getNumberOfConcurrentReads(), equalTo(0)); + assertThat(status.getNumberOfConcurrentWrites(), equalTo(0)); + }); + } + } + +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java index b128e88e63a5a..300794a6c00cf 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ccr.action; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.AbstractSerializingTestCase; @@ -23,11 +24,17 @@ protected ShardFollowTask doParseInstance(XContentParser parser) throws IOExcept @Override protected ShardFollowTask createTestInstance() { return new ShardFollowTask( - randomAlphaOfLength(4), - new ShardId(randomAlphaOfLength(4), randomAlphaOfLength(4), randomInt(5)), - new ShardId(randomAlphaOfLength(4), randomAlphaOfLength(4), randomInt(5)), - randomIntBetween(1, Integer.MAX_VALUE), randomIntBetween(1, Integer.MAX_VALUE), - randomIntBetween(1, Integer.MAX_VALUE), randomBoolean() ? null : Collections.singletonMap("key", "value")); + randomAlphaOfLength(4), + new ShardId(randomAlphaOfLength(4), randomAlphaOfLength(4), randomInt(5)), + new ShardId(randomAlphaOfLength(4), randomAlphaOfLength(4), randomInt(5)), + randomIntBetween(1, Integer.MAX_VALUE), + randomIntBetween(1, Integer.MAX_VALUE), + randomNonNegativeLong(), + randomIntBetween(1, Integer.MAX_VALUE), + randomIntBetween(1, Integer.MAX_VALUE), + TimeValue.parseTimeValue(randomTimeValue(), ""), + TimeValue.parseTimeValue(randomTimeValue(), ""), + randomBoolean() ? null : Collections.singletonMap("key", "value")); } @Override diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java index e837da2b65deb..2b669e9154afb 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java @@ -19,6 +19,8 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; import static org.hamcrest.Matchers.equalTo; @@ -34,22 +36,23 @@ public void testPrimaryTermFromFollower() throws IOException { // we use this primary on the operations yet we expect the applied operations to have the primary term of the follower final long primaryTerm = randomLongBetween(1, Integer.MAX_VALUE); - final Translog.Operation[] operations = new Translog.Operation[randomIntBetween(0, 127)]; - for (int i = 0; i < operations.length; i++) { + int numOps = randomIntBetween(0, 127); + final List operations = new ArrayList<>(randomIntBetween(0, 127)); + for (int i = 0; i < numOps; i++) { final String id = Integer.toString(i); final long seqNo = i; final Translog.Operation.Type type = randomValueOtherThan(Translog.Operation.Type.CREATE, () -> randomFrom(Translog.Operation.Type.values())); switch (type) { case INDEX: - operations[i] = new Translog.Index("_doc", id, seqNo, primaryTerm, 0, VersionType.INTERNAL, SOURCE, null, -1); + operations.add(new Translog.Index("_doc", id, seqNo, primaryTerm, 0, VersionType.INTERNAL, SOURCE, null, -1)); break; case DELETE: - operations[i] = - new Translog.Delete("_doc", id, new Term("_id", Uid.encodeId(id)), seqNo, primaryTerm, 0, VersionType.INTERNAL); + operations.add( + new Translog.Delete("_doc", id, new Term("_id", Uid.encodeId(id)), seqNo, primaryTerm, 0, VersionType.INTERNAL)); break; case NO_OP: - operations[i] = new Translog.NoOp(seqNo, primaryTerm, "test"); + operations.add(new Translog.NoOp(seqNo, primaryTerm, "test")); break; default: throw new IllegalStateException("unexpected operation type [" + type + "]"); @@ -60,7 +63,7 @@ public void testPrimaryTermFromFollower() throws IOException { TransportBulkShardOperationsAction.shardOperationOnPrimary(followerPrimary.shardId(), operations, followerPrimary, logger); try (Translog.Snapshot snapshot = followerPrimary.newTranslogSnapshotFromMinSeqNo(0)) { - assertThat(snapshot.totalOperations(), equalTo(operations.length)); + assertThat(snapshot.totalOperations(), equalTo(operations.size())); Translog.Operation operation; while ((operation = snapshot.next()) != null) { assertThat(operation.primaryTerm(), equalTo(followerPrimary.getPrimaryTerm()));