diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java index 21c56b53f2e62..499d93023e459 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java @@ -71,9 +71,12 @@ public static class Request extends ActionRequest { private String leaderIndex; private String followIndex; - private long batchSize = ShardFollowTasksExecutor.DEFAULT_BATCH_SIZE; - private int concurrentProcessors = ShardFollowTasksExecutor.DEFAULT_CONCURRENT_PROCESSORS; - private long processorMaxTranslogBytes = ShardFollowTasksExecutor.DEFAULT_MAX_TRANSLOG_BYTES; + private int maxReadSize = ShardFollowNodeTask.DEFAULT_MAX_READ_SIZE; + private int maxConcurrentReads = ShardFollowNodeTask.DEFAULT_MAX_CONCURRENT_READS; + private long processorMaxTranslogBytes = ShardFollowNodeTask.DEFAULT_MAX_TRANSLOG_BYTES; + private int maxWriteSize = ShardFollowNodeTask.DEFAULT_MAX_WRITE_SIZE; + private int maxConcurrentWrites = ShardFollowNodeTask.DEFAULT_MAX_CONCURRENT_WRITES; + private int maxBufferSize = ShardFollowNodeTask.DEFAULT_MAX_BUFFER_SIZE; public String getLeaderIndex() { return leaderIndex; @@ -91,23 +94,23 @@ public void setFollowIndex(String followIndex) { this.followIndex = followIndex; } - public long getBatchSize() { - return batchSize; + public int getMaxReadSize() { + return maxReadSize; } - public void setBatchSize(long batchSize) { - if (batchSize < 1) { - throw new IllegalArgumentException("Illegal batch_size [" + batchSize + "]"); + public void setMaxReadSize(int maxReadSize) { + if (maxReadSize < 1) { + throw new IllegalArgumentException("Illegal batch_size [" + maxReadSize + "]"); } - this.batchSize = batchSize; + this.maxReadSize = maxReadSize; } - public void setConcurrentProcessors(int concurrentProcessors) { - if (concurrentProcessors < 1) { + public void setMaxConcurrentReads(int maxConcurrentReads) { + if (maxConcurrentReads < 1) { throw new IllegalArgumentException("concurrent_processors must be larger than 0"); } - this.concurrentProcessors = concurrentProcessors; + this.maxConcurrentReads = maxConcurrentReads; } public void setProcessorMaxTranslogBytes(long processorMaxTranslogBytes) { @@ -117,6 +120,39 @@ public void setProcessorMaxTranslogBytes(long processorMaxTranslogBytes) { this.processorMaxTranslogBytes = processorMaxTranslogBytes; } + public int getMaxWriteSize() { + return maxWriteSize; + } + + public void setMaxWriteSize(int maxWriteSize) { + if (maxWriteSize < 1) { + throw new IllegalArgumentException("maxWriteSize must be larger than 0"); + } + this.maxWriteSize = maxWriteSize; + } + + public int getMaxConcurrentWrites() { + return maxConcurrentWrites; + } + + public void setMaxConcurrentWrites(int maxConcurrentWrites) { + if (maxConcurrentWrites < 1) { + throw new IllegalArgumentException("maxConcurrentWrites must be larger than 0"); + } + this.maxConcurrentWrites = maxConcurrentWrites; + } + + public int getMaxBufferSize() { + return maxBufferSize; + } + + public void setMaxBufferSize(int maxBufferSize) { + if (maxBufferSize < 1) { + throw new IllegalArgumentException("maxBufferSize must be larger than 0"); + } + this.maxBufferSize = maxBufferSize; + } + @Override public ActionRequestValidationException validate() { return null; @@ -127,9 +163,12 @@ public void readFrom(StreamInput in) throws IOException { super.readFrom(in); leaderIndex = in.readString(); followIndex = in.readString(); - batchSize = in.readVLong(); - concurrentProcessors = in.readVInt(); + maxReadSize = in.readVInt(); + maxConcurrentReads = in.readVInt(); processorMaxTranslogBytes = in.readVLong(); + maxWriteSize = in.readVInt(); + maxConcurrentWrites = in.readVInt(); + maxBufferSize = in.readVInt(); } @Override @@ -137,9 +176,12 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(leaderIndex); out.writeString(followIndex); - out.writeVLong(batchSize); - out.writeVInt(concurrentProcessors); + out.writeVInt(maxReadSize); + out.writeVInt(maxConcurrentReads); out.writeVLong(processorMaxTranslogBytes); + out.writeVInt(maxWriteSize); + out.writeVInt(maxConcurrentWrites); + out.writeVInt(maxBufferSize); } @Override @@ -147,16 +189,20 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Request request = (Request) o; - return batchSize == request.batchSize && - concurrentProcessors == request.concurrentProcessors && + return maxReadSize == request.maxReadSize && + maxConcurrentReads == request.maxConcurrentReads && processorMaxTranslogBytes == request.processorMaxTranslogBytes && + maxWriteSize == request.maxWriteSize && + maxConcurrentWrites == request.maxConcurrentWrites && + maxBufferSize == request.maxBufferSize && Objects.equals(leaderIndex, request.leaderIndex) && Objects.equals(followIndex, request.followIndex); } @Override public int hashCode() { - return Objects.hash(leaderIndex, followIndex, batchSize, concurrentProcessors, processorMaxTranslogBytes); + return Objects.hash(leaderIndex, followIndex, maxReadSize, maxConcurrentReads, processorMaxTranslogBytes, + maxWriteSize, maxConcurrentWrites, maxBufferSize); } } @@ -254,7 +300,8 @@ void start(Request request, String clusterNameAlias, IndexMetaData leaderIndexMe ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias, new ShardId(followIndexMetadata.getIndex(), shardId), new ShardId(leaderIndexMetadata.getIndex(), shardId), - request.batchSize, request.concurrentProcessors, request.processorMaxTranslogBytes, filteredHeaders); + request.maxReadSize, request.maxConcurrentReads, request.processorMaxTranslogBytes, + request.maxWriteSize, request.maxConcurrentWrites, request.maxBufferSize, filteredHeaders); persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask, new ActionListener>() { @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index c72b13a701f41..b746f8dcd4225 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -54,9 +54,9 @@ public Response newResponse() { public static class Request extends SingleShardRequest { private long minSeqNo; - private long maxSeqNo; + private Long maxSeqNo; private ShardId shardId; - private long maxTranslogsBytes = ShardFollowTasksExecutor.DEFAULT_MAX_TRANSLOG_BYTES; + private long maxTranslogsBytes = ShardFollowNodeTask.DEFAULT_MAX_TRANSLOG_BYTES; public Request(ShardId shardId) { super(shardId.getIndexName()); @@ -78,11 +78,11 @@ public void setMinSeqNo(long minSeqNo) { this.minSeqNo = minSeqNo; } - public long getMaxSeqNo() { + public Long getMaxSeqNo() { return maxSeqNo; } - public void setMaxSeqNo(long maxSeqNo) { + public void setMaxSeqNo(Long maxSeqNo) { this.maxSeqNo = maxSeqNo; } @@ -100,7 +100,7 @@ public ActionRequestValidationException validate() { if (minSeqNo < 0) { validationException = addValidationError("minSeqNo [" + minSeqNo + "] cannot be lower than 0", validationException); } - if (maxSeqNo < minSeqNo) { + if (maxSeqNo != null && maxSeqNo < minSeqNo) { validationException = addValidationError("minSeqNo [" + minSeqNo + "] cannot be larger than maxSeqNo [" + maxSeqNo + "]", validationException); } @@ -115,7 +115,7 @@ public ActionRequestValidationException validate() { public void readFrom(StreamInput in) throws IOException { super.readFrom(in); minSeqNo = in.readVLong(); - maxSeqNo = in.readVLong(); + maxSeqNo = in.readOptionalLong(); shardId = ShardId.readShardId(in); maxTranslogsBytes = in.readVLong(); } @@ -124,7 +124,7 @@ public void readFrom(StreamInput in) throws IOException { public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeVLong(minSeqNo); - out.writeVLong(maxSeqNo); + out.writeOptionalLong(maxSeqNo); shardId.writeTo(out); out.writeVLong(maxTranslogsBytes); } @@ -136,7 +136,7 @@ public boolean equals(final Object o) { if (o == null || getClass() != o.getClass()) return false; final Request request = (Request) o; return minSeqNo == request.minSeqNo && - maxSeqNo == request.maxSeqNo && + Objects.equals(maxSeqNo, request.maxSeqNo) && Objects.equals(shardId, request.shardId) && maxTranslogsBytes == request.maxTranslogsBytes; } @@ -150,13 +150,15 @@ public int hashCode() { public static final class Response extends ActionResponse { private long indexMetadataVersion; + private long leaderGlobalCheckpoint; private Translog.Operation[] operations; Response() { } - Response(long indexMetadataVersion, final Translog.Operation[] operations) { + Response(long indexMetadataVersion, long leaderGlobalCheckpoint, final Translog.Operation[] operations) { this.indexMetadataVersion = indexMetadataVersion; + this.leaderGlobalCheckpoint = leaderGlobalCheckpoint; this.operations = operations; } @@ -164,6 +166,10 @@ public long getIndexMetadataVersion() { return indexMetadataVersion; } + public long getLeaderGlobalCheckpoint() { + return leaderGlobalCheckpoint; + } + public Translog.Operation[] getOperations() { return operations; } @@ -172,6 +178,7 @@ public Translog.Operation[] getOperations() { public void readFrom(final StreamInput in) throws IOException { super.readFrom(in); indexMetadataVersion = in.readVLong(); + leaderGlobalCheckpoint = in.readZLong(); operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new); } @@ -179,6 +186,7 @@ public void readFrom(final StreamInput in) throws IOException { public void writeTo(final StreamOutput out) throws IOException { super.writeTo(out); out.writeVLong(indexMetadataVersion); + out.writeZLong(leaderGlobalCheckpoint); out.writeArray(Translog.Operation::writeOperation, operations); } @@ -188,13 +196,15 @@ public boolean equals(final Object o) { if (o == null || getClass() != o.getClass()) return false; final Response response = (Response) o; return indexMetadataVersion == response.indexMetadataVersion && - Arrays.equals(operations, response.operations); + leaderGlobalCheckpoint == response.leaderGlobalCheckpoint && + Arrays.equals(operations, response.operations); } @Override public int hashCode() { int result = 1; result += Objects.hashCode(indexMetadataVersion); + result += Objects.hashCode(leaderGlobalCheckpoint); result += Arrays.hashCode(operations); return result; } @@ -222,14 +232,16 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex()); IndexShard indexShard = indexService.getShard(request.getShard().id()); final long indexMetaDataVersion = clusterService.state().metaData().index(shardId.getIndex()).getVersion(); + // The following shard generates this request based on the global checkpoint on the primary copy on the leader. // Although this value might not have been synced to all replica copies on the leader, the requesting range // is guaranteed to be at most the local-checkpoint of any shard copies on the leader. - assert request.maxSeqNo <= indexShard.getLocalCheckpoint() : "invalid request from_seqno=[" + request.minSeqNo + "]," + - " to_seqno=[" + request.maxSeqNo + "], local_checkpoint=[" + indexShard.getLocalCheckpoint() + "]"; + assert request.maxSeqNo == null || request.maxSeqNo <= indexShard.getLocalCheckpoint() : "invalid request from_seqno=[" + + request.minSeqNo + "]," + " to_seqno=[" + request.maxSeqNo + "], local_checkpoint=[" + + indexShard.getLocalCheckpoint() + "]"; final Translog.Operation[] operations = getOperationsBetween(indexShard, request.minSeqNo, request.maxSeqNo, request.maxTranslogsBytes); - return new Response(indexMetaDataVersion, operations); + return new Response(indexMetaDataVersion, indexShard.getGlobalCheckpoint(), operations); } @Override @@ -254,14 +266,15 @@ protected Response newResponse() { private static final Translog.Operation[] EMPTY_OPERATIONS_ARRAY = new Translog.Operation[0]; - static Translog.Operation[] getOperationsBetween(IndexShard indexShard, long minSeqNo, long maxSeqNo, + static Translog.Operation[] getOperationsBetween(IndexShard indexShard, long minSeqNo, Long maxSeqNo, long byteLimit) throws IOException { if (indexShard.state() != IndexShardState.STARTED) { throw new IndexShardNotStartedException(indexShard.shardId(), indexShard.state()); } int seenBytes = 0; final List operations = new ArrayList<>(); - try (Translog.Snapshot snapshot = indexShard.newLuceneChangesSnapshot("ccr", minSeqNo, maxSeqNo, true)) { + long max = maxSeqNo != null ? maxSeqNo : minSeqNo + 1000; + try (Translog.Snapshot snapshot = indexShard.newLuceneChangesSnapshot("ccr", minSeqNo, max, true)) { Translog.Operation op; while ((op = snapshot.next()) != null) { if (op.getSource() == null) { @@ -274,6 +287,15 @@ static Translog.Operation[] getOperationsBetween(IndexShard indexShard, long min break; } } + } catch (IllegalStateException e) { + // TODO: handle peek reads better. + // Should this optional upper bound leak into the newLuceneChangesSnapshot(...) method? + if (maxSeqNo != null) { + throw e; + } else if (e.getMessage().contains("prematurely terminated last_seen_seqno") == false) { + // Only fail if there are gaps between the ops. + throw e; + } } return operations.toArray(EMPTY_OPERATIONS_ARRAY); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 8777fd0eabe95..5fcfd44d27fd2 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -5,28 +5,360 @@ */ package org.elasticsearch.xpack.ccr.action; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.NoShardAvailableActionException; +import org.elasticsearch.action.UnavailableShardsException; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.transport.NetworkExceptionHelper; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.transport.ActionTransportException; +import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction; +import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest; +import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; import java.io.IOException; +import java.util.Arrays; +import java.util.LinkedList; import java.util.Map; import java.util.Objects; -import java.util.concurrent.atomic.AtomicLong; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.LongConsumer; +/** + * The node task that fetch the write operations from a leader shard and + * persists these ops in the follower shard. + */ public class ShardFollowNodeTask extends AllocatedPersistentTask { - private final AtomicLong processedGlobalCheckpoint = new AtomicLong(); + static final int DEFAULT_MAX_READ_SIZE = 1024; + static final int DEFAULT_MAX_WRITE_SIZE = 1024; + static final int RETRY_LIMIT = 10; + static final int DEFAULT_MAX_CONCURRENT_READS = 1; + static final int DEFAULT_MAX_CONCURRENT_WRITES = 1; + static final int DEFAULT_MAX_BUFFER_SIZE = 10240; + static final long DEFAULT_MAX_TRANSLOG_BYTES= Long.MAX_VALUE; + private static final TimeValue RETRY_TIMEOUT = TimeValue.timeValueMillis(500); + + private static final Logger LOGGER = Loggers.getLogger(ShardFollowNodeTask.class); + + final Client leaderClient; + final Client followerClient; + private final ShardFollowTask params; + private final BiConsumer scheduler; + + private volatile long lastRequestedSeqno; + private volatile long leaderGlobalCheckpoint; + + private volatile int numConcurrentReads = 0; + private volatile int numConcurrentWrites = 0; + private volatile long processedGlobalCheckpoint = 0; + private volatile long currentIndexMetadataVersion = 0; + private final AtomicInteger retryCounter = new AtomicInteger(0); + private final Queue buffer = new LinkedList<>(); - ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map headers) { + ShardFollowNodeTask(long id, + String type, + String action, + String description, + TaskId parentTask, + Map headers, + Client leaderClient, + Client followerClient, + ShardFollowTask params, + BiConsumer scheduler) { super(id, type, action, description, parentTask, headers); + this.leaderClient = leaderClient; + this.followerClient = followerClient; + this.params = params; + this.scheduler = scheduler; + } + + void start(long leaderGlobalCheckpoint, long followGlobalCheckpoint) { + this.lastRequestedSeqno = followGlobalCheckpoint; + this.processedGlobalCheckpoint = followGlobalCheckpoint; + this.leaderGlobalCheckpoint = leaderGlobalCheckpoint; + + // Forcefully updates follower mapping, this gets us the leader imd version and + // makes sure that leader and follower mapping are identical. + updateMapping(imdVersion -> { + currentIndexMetadataVersion = imdVersion; + LOGGER.info("{} Started to follow leader shard {}, leaderGlobalCheckPoint={}, followGlobalCheckPoint={}", + params.getFollowShardId(), params.getLeaderShardId(), leaderGlobalCheckpoint, followGlobalCheckpoint); + coordinateReads(); + }); + } + + private synchronized void coordinateReads() { + if (isStopped()) { + LOGGER.info("{} shard follow task has been stopped", params.getFollowShardId()); + return; + } + + LOGGER.trace("{} coordinate reads, lastRequestedSeqno={}, leaderGlobalCheckpoint={}", + params.getFollowShardId(), lastRequestedSeqno, leaderGlobalCheckpoint); + final long maxReadSize = params.getMaxReadSize(); + final long maxConcurrentReads = params.getMaxConcurrentReads(); + if (lastRequestedSeqno < leaderGlobalCheckpoint) { + while (true) { + if (lastRequestedSeqno >= leaderGlobalCheckpoint) { + LOGGER.debug("{} no new reads to coordinate lastRequestedSeqno [{}] leaderGlobalCheckpoint [{}]", + params.getLeaderShardId(), lastRequestedSeqno, leaderGlobalCheckpoint); + break; + } + if (numConcurrentReads >= maxConcurrentReads) { + LOGGER.debug("{} no new reads, maximum number of concurrent reads have been reached [{}]", + params.getFollowShardId(), numConcurrentReads); + break; + } + if (buffer.size() > params.getMaxBufferSize()) { + LOGGER.debug("{} no new reads, buffer limit has been reached [{}]", params.getFollowShardId(), buffer.size()); + break; + } + numConcurrentReads++; + long from = lastRequestedSeqno + 1; + long to = from + maxReadSize <= leaderGlobalCheckpoint ? from + maxReadSize : leaderGlobalCheckpoint; + LOGGER.debug("{}[{}] read [{}/{}]", params.getFollowShardId(), numConcurrentReads, from, to); + sendShardChangesRequest(from, to); + lastRequestedSeqno = to; + } + if (numConcurrentReads == 0) { + LOGGER.debug("{} re-scheduling coordinate reads phase", params.getFollowShardId()); + scheduler.accept(TimeValue.timeValueMillis(500), this::coordinateReads); + } + } else { + if (numConcurrentReads == 0) { + LOGGER.debug("{} scheduling peek read", params.getFollowShardId()); + scheduler.accept(TimeValue.timeValueMillis(500), () -> { + synchronized (this) { + // We sneak peek if there is any thing new in the leader primary. + // If there is we will happily accept + numConcurrentReads++; + long from = lastRequestedSeqno + 1; + LOGGER.debug("{}[{}] peek read [{}]", params.getFollowShardId(), numConcurrentReads, from); + sendShardChangesRequest(from, null); + } + }); + } + } + } + + private synchronized void coordinateWrites() { + while (true) { + if (buffer.isEmpty()) { + LOGGER.debug("{} no writes to coordinate, because buffer is empty", params.getFollowShardId()); + break; + } + if (numConcurrentWrites >= params.getMaxConcurrentWrites()) { + LOGGER.debug("{} maximum number of concurrent writes have been reached [{}]", + params.getFollowShardId(), numConcurrentWrites); + break; + } + Translog.Operation[] ops = new Translog.Operation[Math.min(params.getMaxWriteSize(), buffer.size())]; + for (int i = 0; i < ops.length; i++) { + ops[i] = buffer.remove(); + } + numConcurrentWrites++; + LOGGER.debug("{}[{}] write [{}/{}] [{}]", params.getFollowShardId(), numConcurrentWrites, ops[0].seqNo(), + ops[ops.length - 1].seqNo(), ops.length); + sendBulkShardOperationsRequest(ops); + } + } + + private void sendShardChangesRequest(long from, Long to) { + innerSendShardChangesRequest(from, to, + response -> { + retryCounter.set(0); + handleResponse(from, to, response); + }, + e -> handleFailure(e, () -> sendShardChangesRequest(from, to))); + } + + private synchronized void handleResponse(long from, Long to, ShardChangesAction.Response response) { + maybeUpdateMapping(response.getIndexMetadataVersion(), () -> { + synchronized (ShardFollowNodeTask.this) { + leaderGlobalCheckpoint = Math.max(leaderGlobalCheckpoint, response.getLeaderGlobalCheckpoint()); + if (response.getOperations().length == 0) { + numConcurrentReads--; + if (numConcurrentWrites == 0) { + coordinateWrites(); + } + coordinateReads(); + } else { + Translog.Operation firstOp = response.getOperations()[0]; + assert firstOp.seqNo() == from; + Translog.Operation lastOp = response.getOperations()[response.getOperations().length - 1]; + + LOGGER.debug("{} received [{}/{}]", params.getFollowShardId(), firstOp.seqNo(), lastOp.seqNo()); + buffer.addAll(Arrays.asList(response.getOperations())); + if (to == null) { + lastRequestedSeqno = Math.max(lastRequestedSeqno, lastOp.seqNo()); + LOGGER.debug("{} post updating lastRequestedSeqno to [{}]", params.getFollowShardId(), lastRequestedSeqno); + numConcurrentReads--; + } else { + if (lastOp.seqNo() < to) { + long newFrom = lastOp.seqNo() + 1; + LOGGER.debug("{} received [{}] as last op while [{}] was expected, continue to read [{}/{}]...", + params.getFollowShardId(), lastOp.seqNo(), to, newFrom, to); + sendShardChangesRequest(newFrom, to); + } else { + numConcurrentReads--; + } + } + if (numConcurrentWrites == 0) { + coordinateWrites(); + } + coordinateReads(); + } + assert numConcurrentReads >= 0; + } + }); + } + + private void sendBulkShardOperationsRequest(Translog.Operation[] operations) { + innerSendBulkShardOperationsRequest(operations, + followerLocalCheckpoint -> { + retryCounter.set(0); + handleResponse(followerLocalCheckpoint); + }, + e -> handleFailure(e, () -> sendBulkShardOperationsRequest(operations)) + ); + } + + private synchronized void handleResponse(long followerLocalCheckpoint) { + processedGlobalCheckpoint = Math.max(processedGlobalCheckpoint, followerLocalCheckpoint); + numConcurrentWrites--; + assert numConcurrentWrites >= 0; + coordinateWrites(); + } + + private void maybeUpdateMapping(Long minimumRequiredIndexMetadataVersion, Runnable task) { + assert Thread.holdsLock(this); + if (currentIndexMetadataVersion >= minimumRequiredIndexMetadataVersion) { + LOGGER.trace("{} index metadata version [{}] is higher or equal than minimum required index metadata version [{}]", + params.getFollowShardId(), currentIndexMetadataVersion, minimumRequiredIndexMetadataVersion); + task.run(); + } else { + LOGGER.debug("{} updating mapping, index metadata version [{}] is lower than minimum required index metadata version [{}]", + params.getFollowShardId(), currentIndexMetadataVersion, minimumRequiredIndexMetadataVersion); + updateMapping(imdVersion -> { + retryCounter.set(0); + currentIndexMetadataVersion = imdVersion; + task.run(); + }); + } + } + + private void handleFailure(Exception e, Runnable task) { + assert e != null; + if (shouldRetry(e)) { + if (isStopped() == false && retryCounter.incrementAndGet() <= RETRY_LIMIT) { + LOGGER.warn("error during follow shard task, retrying...", e); + scheduler.accept(RETRY_TIMEOUT, task); + } else { + markAsFailed(new ElasticsearchException("retrying failed [" + retryCounter.get() + + "] times, aborting...", e)); + } + } else { + markAsFailed(e); + } + } + + private boolean shouldRetry(Exception e) { + // TODO: What other exceptions should be retried? + return NetworkExceptionHelper.isConnectException(e) || + NetworkExceptionHelper.isCloseConnectionException(e) || + e instanceof ActionTransportException || + e instanceof NodeClosedException || + e instanceof UnavailableShardsException || + e instanceof NoShardAvailableActionException; + } + + // These methods are protected for testing purposes: + protected void updateMapping(LongConsumer handler) { + Index leaderIndex = params.getLeaderShardId().getIndex(); + Index followIndex = params.getFollowShardId().getIndex(); + + ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.clear(); + clusterStateRequest.metaData(true); + clusterStateRequest.indices(leaderIndex.getName()); + + leaderClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> { + IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex); + assert indexMetaData.getMappings().size() == 1; + MappingMetaData mappingMetaData = indexMetaData.getMappings().iterator().next().value; + + PutMappingRequest putMappingRequest = new PutMappingRequest(followIndex.getName()); + putMappingRequest.type(mappingMetaData.type()); + putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON); + followerClient.admin().indices().putMapping(putMappingRequest, ActionListener.wrap( + putMappingResponse -> handler.accept(indexMetaData.getVersion()), + e -> handleFailure(e, () -> updateMapping(handler)))); + }, e -> handleFailure(e, () -> updateMapping(handler)))); + } + + protected void innerSendBulkShardOperationsRequest(Translog.Operation[] operations, + LongConsumer handler, + Consumer errorHandler) { + final BulkShardOperationsRequest request = new BulkShardOperationsRequest(params.getFollowShardId(), operations); + followerClient.execute(BulkShardOperationsAction.INSTANCE, request, + new ActionListener() { + @Override + public void onResponse(BulkShardOperationsResponse response) { + handler.accept(response.getLocalCheckpoint()); + } + + @Override + public void onFailure(Exception e) { + errorHandler.accept(e); + } + } + ); + } + + protected void innerSendShardChangesRequest(long from, + Long to, + Consumer handler, + Consumer errorHandler) { + ShardChangesAction.Request request = new ShardChangesAction.Request(params.getLeaderShardId()); + request.setMinSeqNo(from); + request.setMaxSeqNo(to); + request.setMaxTranslogsBytes(params.getMaxTranslogBytes()); + leaderClient.execute(ShardChangesAction.INSTANCE, request, new ActionListener() { + @Override + public void onResponse(ShardChangesAction.Response response) { + handler.accept(response); + } + + @Override + public void onFailure(Exception e) { + errorHandler.accept(e); + } + }); } @Override @@ -34,17 +366,13 @@ protected void onCancelled() { markAsCompleted(); } - public boolean isRunning() { - return isCancelled() == false && isCompleted() == false; - } - - void updateProcessedGlobalCheckpoint(long processedGlobalCheckpoint) { - this.processedGlobalCheckpoint.set(processedGlobalCheckpoint); + protected boolean isStopped() { + return isCancelled() || isCompleted(); } @Override - public Task.Status getStatus() { - return new Status(processedGlobalCheckpoint.get()); + public Status getStatus() { + return new Status(processedGlobalCheckpoint, numConcurrentReads, numConcurrentWrites); } public static class Status implements Task.Status { @@ -52,28 +380,46 @@ public static class Status implements Task.Status { public static final String NAME = "shard-follow-node-task-status"; static final ParseField PROCESSED_GLOBAL_CHECKPOINT_FIELD = new ParseField("processed_global_checkpoint"); + static final ParseField NUMBER_OF_CONCURRENT_READS_FIELD = new ParseField("number_of_concurrent_reads"); + static final ParseField NUMBER_OF_CONCURRENT_WRITES_FIELD = new ParseField("number_of_concurrent_writes"); static final ConstructingObjectParser PARSER = - new ConstructingObjectParser<>(NAME, args -> new Status((Long) args[0])); + new ConstructingObjectParser<>(NAME, args -> new Status((long) args[0], (int) args[1], (int) args[2])); static { PARSER.declareLong(ConstructingObjectParser.constructorArg(), PROCESSED_GLOBAL_CHECKPOINT_FIELD); + PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_READS_FIELD); + PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_WRITES_FIELD); } private final long processedGlobalCheckpoint; + private final int numberOfConcurrentReads; + private final int numberOfConcurrentWrites; - Status(long processedGlobalCheckpoint) { + Status(long processedGlobalCheckpoint, int numberOfConcurrentReads, int numberOfConcurrentWrites) { this.processedGlobalCheckpoint = processedGlobalCheckpoint; + this.numberOfConcurrentReads = numberOfConcurrentReads; + this.numberOfConcurrentWrites = numberOfConcurrentWrites; } public Status(StreamInput in) throws IOException { this.processedGlobalCheckpoint = in.readZLong(); + this.numberOfConcurrentReads = in.readVInt(); + this.numberOfConcurrentWrites = in.readVInt(); } public long getProcessedGlobalCheckpoint() { return processedGlobalCheckpoint; } + public int getNumberOfConcurrentReads() { + return numberOfConcurrentReads; + } + + public int getNumberOfConcurrentWrites() { + return numberOfConcurrentWrites; + } + @Override public String getWriteableName() { return NAME; @@ -82,6 +428,8 @@ public String getWriteableName() { @Override public void writeTo(StreamOutput out) throws IOException { out.writeZLong(processedGlobalCheckpoint); + out.writeVInt(numberOfConcurrentReads); + out.writeVInt(numberOfConcurrentWrites); } @Override @@ -90,6 +438,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws { builder.field(PROCESSED_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), processedGlobalCheckpoint); } + { + builder.field(NUMBER_OF_CONCURRENT_READS_FIELD.getPreferredName(), numberOfConcurrentReads); + } + { + builder.field(NUMBER_OF_CONCURRENT_WRITES_FIELD.getPreferredName(), numberOfConcurrentWrites); + } builder.endObject(); return builder; } @@ -103,12 +457,14 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Status status = (Status) o; - return processedGlobalCheckpoint == status.processedGlobalCheckpoint; + return processedGlobalCheckpoint == status.processedGlobalCheckpoint && + numberOfConcurrentReads == status.numberOfConcurrentReads && + numberOfConcurrentWrites == status.numberOfConcurrentWrites; } @Override public int hashCode() { - return Objects.hash(processedGlobalCheckpoint); + return Objects.hash(processedGlobalCheckpoint, numberOfConcurrentReads, numberOfConcurrentWrites); } public String toString() { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java index e97f8e7dc0c3c..a92172b390708 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java @@ -40,15 +40,18 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { static final ParseField LEADER_SHARD_INDEX_UUID_FIELD = new ParseField("leader_shard_index_uuid"); static final ParseField LEADER_SHARD_SHARDID_FIELD = new ParseField("leader_shard_shard"); static final ParseField HEADERS = new ParseField("headers"); - public static final ParseField MAX_CHUNK_SIZE = new ParseField("max_chunk_size"); - public static final ParseField NUM_CONCURRENT_CHUNKS = new ParseField("max_concurrent_chunks"); - public static final ParseField PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST = new ParseField("processor_max_translog_bytes"); + public static final ParseField MAX_READ_SIZE = new ParseField("max_read_size"); + public static final ParseField MAX_CONCURRENT_READS = new ParseField("max_concurrent_reads"); + public static final ParseField MAX_TRANSLOG_BYTES_PER_REQUEST = new ParseField("max_translog_bytes"); + public static final ParseField MAX_WRITE_SIZE = new ParseField("max_write_size"); + public static final ParseField MAX_CONCURRENT_WRITES = new ParseField("max_concurrent_writes"); + public static final ParseField MAX_BUFFER_SIZE = new ParseField("max_buffer_size"); @SuppressWarnings("unchecked") - public static ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, + private static ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, (a) -> new ShardFollowTask((String) a[0], new ShardId((String) a[1], (String) a[2], (int) a[3]), - new ShardId((String) a[4], (String) a[5], (int) a[6]), (long) a[7], (int) a[8], (long) a[9], - (Map) a[10])); + new ShardId((String) a[4], (String) a[5], (int) a[6]), (int) a[7], (int) a[8], (long) a[9], + (int) a[10], (int) a[11], (int) a[12], (Map) a[13])); static { PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), LEADER_CLUSTER_ALIAS_FIELD); @@ -58,28 +61,38 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_SHARD_INDEX_FIELD); PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_SHARD_INDEX_UUID_FIELD); PARSER.declareInt(ConstructingObjectParser.constructorArg(), LEADER_SHARD_SHARDID_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), MAX_CHUNK_SIZE); - PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUM_CONCURRENT_CHUNKS); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST); + PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_READ_SIZE); + PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_CONCURRENT_READS); + PARSER.declareLong(ConstructingObjectParser.constructorArg(), MAX_TRANSLOG_BYTES_PER_REQUEST); + PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_WRITE_SIZE); + PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_CONCURRENT_WRITES); + PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_BUFFER_SIZE); PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.mapStrings(), HEADERS); } private final String leaderClusterAlias; private final ShardId followShardId; private final ShardId leaderShardId; - private final long maxChunkSize; - private final int numConcurrentChunks; - private final long processorMaxTranslogBytes; + private final int maxReadSize; + private final int maxConcurrentReads; + private final long maxTranslogBytes; + private final int maxWriteSize; + private final int maxConcurrentWrites; + private final int maxBufferSize; private final Map headers; - ShardFollowTask(String leaderClusterAlias, ShardId followShardId, ShardId leaderShardId, long maxChunkSize, - int numConcurrentChunks, long processorMaxTranslogBytes, Map headers) { + ShardFollowTask(String leaderClusterAlias, ShardId followShardId, ShardId leaderShardId, int maxReadSize, + int maxConcurrentReads, long maxTranslogBytes, int maxWriteSize, int maxConcurrentWrites, + int maxBufferSize, Map headers) { this.leaderClusterAlias = leaderClusterAlias; this.followShardId = followShardId; this.leaderShardId = leaderShardId; - this.maxChunkSize = maxChunkSize; - this.numConcurrentChunks = numConcurrentChunks; - this.processorMaxTranslogBytes = processorMaxTranslogBytes; + this.maxReadSize = maxReadSize; + this.maxConcurrentReads = maxConcurrentReads; + this.maxTranslogBytes = maxTranslogBytes; + this.maxWriteSize = maxWriteSize; + this.maxConcurrentWrites = maxConcurrentWrites; + this.maxBufferSize = maxBufferSize; this.headers = headers != null ? Collections.unmodifiableMap(headers) : Collections.emptyMap(); } @@ -87,9 +100,12 @@ public ShardFollowTask(StreamInput in) throws IOException { this.leaderClusterAlias = in.readOptionalString(); this.followShardId = ShardId.readShardId(in); this.leaderShardId = ShardId.readShardId(in); - this.maxChunkSize = in.readVLong(); - this.numConcurrentChunks = in.readVInt(); - this.processorMaxTranslogBytes = in.readVLong(); + this.maxReadSize = in.readVInt(); + this.maxConcurrentReads = in.readVInt(); + this.maxTranslogBytes = in.readVLong(); + this.maxWriteSize = in.readVInt(); + this.maxConcurrentWrites= in.readVInt(); + this.maxBufferSize = in.readVInt(); this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString)); } @@ -105,16 +121,28 @@ public ShardId getLeaderShardId() { return leaderShardId; } - public long getMaxChunkSize() { - return maxChunkSize; + public int getMaxReadSize() { + return maxReadSize; } - public int getNumConcurrentChunks() { - return numConcurrentChunks; + public int getMaxWriteSize() { + return maxWriteSize; } - public long getProcessorMaxTranslogBytes() { - return processorMaxTranslogBytes; + public int getMaxConcurrentReads() { + return maxConcurrentReads; + } + + public int getMaxConcurrentWrites() { + return maxConcurrentWrites; + } + + public int getMaxBufferSize() { + return maxBufferSize; + } + + public long getMaxTranslogBytes() { + return maxTranslogBytes; } public Map getHeaders() { @@ -131,9 +159,12 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalString(leaderClusterAlias); followShardId.writeTo(out); leaderShardId.writeTo(out); - out.writeVLong(maxChunkSize); - out.writeVInt(numConcurrentChunks); - out.writeVLong(processorMaxTranslogBytes); + out.writeVLong(maxReadSize); + out.writeVInt(maxConcurrentReads); + out.writeVLong(maxTranslogBytes); + out.writeVInt(maxWriteSize); + out.writeVInt(maxConcurrentWrites); + out.writeVInt(maxBufferSize); out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString); } @@ -144,19 +175,50 @@ public static ShardFollowTask fromXContent(XContentParser parser) { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - if (leaderClusterAlias != null) { - builder.field(LEADER_CLUSTER_ALIAS_FIELD.getPreferredName(), leaderClusterAlias); + { + if (leaderClusterAlias != null) { + builder.field(LEADER_CLUSTER_ALIAS_FIELD.getPreferredName(), leaderClusterAlias); + } + } + { + builder.field(FOLLOW_SHARD_INDEX_FIELD.getPreferredName(), followShardId.getIndex().getName()); + } + { + builder.field(FOLLOW_SHARD_INDEX_UUID_FIELD.getPreferredName(), followShardId.getIndex().getUUID()); + } + { + builder.field(FOLLOW_SHARD_SHARDID_FIELD.getPreferredName(), followShardId.id()); + } + { + builder.field(LEADER_SHARD_INDEX_FIELD.getPreferredName(), leaderShardId.getIndex().getName()); + } + { + builder.field(LEADER_SHARD_INDEX_UUID_FIELD.getPreferredName(), leaderShardId.getIndex().getUUID()); + } + { + builder.field(LEADER_SHARD_SHARDID_FIELD.getPreferredName(), leaderShardId.id()); + } + { + builder.field(MAX_READ_SIZE.getPreferredName(), maxReadSize); + } + { + builder.field(MAX_CONCURRENT_READS.getPreferredName(), maxConcurrentReads); + } + { + builder.field(MAX_TRANSLOG_BYTES_PER_REQUEST.getPreferredName(), maxTranslogBytes); + } + { + builder.field(MAX_WRITE_SIZE.getPreferredName(), maxWriteSize); + } + { + builder.field(MAX_CONCURRENT_WRITES.getPreferredName(), maxConcurrentWrites); + } + { + builder.field(MAX_BUFFER_SIZE.getPreferredName(), maxBufferSize); + } + { + builder.field(HEADERS.getPreferredName(), headers); } - builder.field(FOLLOW_SHARD_INDEX_FIELD.getPreferredName(), followShardId.getIndex().getName()); - builder.field(FOLLOW_SHARD_INDEX_UUID_FIELD.getPreferredName(), followShardId.getIndex().getUUID()); - builder.field(FOLLOW_SHARD_SHARDID_FIELD.getPreferredName(), followShardId.id()); - builder.field(LEADER_SHARD_INDEX_FIELD.getPreferredName(), leaderShardId.getIndex().getName()); - builder.field(LEADER_SHARD_INDEX_UUID_FIELD.getPreferredName(), leaderShardId.getIndex().getUUID()); - builder.field(LEADER_SHARD_SHARDID_FIELD.getPreferredName(), leaderShardId.id()); - builder.field(MAX_CHUNK_SIZE.getPreferredName(), maxChunkSize); - builder.field(NUM_CONCURRENT_CHUNKS.getPreferredName(), numConcurrentChunks); - builder.field(PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST.getPreferredName(), processorMaxTranslogBytes); - builder.field(HEADERS.getPreferredName(), headers); return builder.endObject(); } @@ -168,16 +230,19 @@ public boolean equals(Object o) { return Objects.equals(leaderClusterAlias, that.leaderClusterAlias) && Objects.equals(followShardId, that.followShardId) && Objects.equals(leaderShardId, that.leaderShardId) && - maxChunkSize == that.maxChunkSize && - numConcurrentChunks == that.numConcurrentChunks && - processorMaxTranslogBytes == that.processorMaxTranslogBytes && + maxReadSize == that.maxReadSize && + maxConcurrentReads == that.maxConcurrentReads && + maxWriteSize == that.maxWriteSize && + maxConcurrentWrites == that.maxConcurrentWrites && + maxTranslogBytes == that.maxTranslogBytes && + maxBufferSize == that.maxBufferSize && Objects.equals(headers, that.headers); } @Override public int hashCode() { - return Objects.hash(leaderClusterAlias, followShardId, leaderShardId, maxChunkSize, numConcurrentChunks, - processorMaxTranslogBytes, headers); + return Objects.hash(leaderClusterAlias, followShardId, leaderShardId, maxReadSize, maxConcurrentReads, + maxWriteSize, maxConcurrentWrites, maxTranslogBytes, maxBufferSize, headers); } public String toString() { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 895f53846f65a..16d5cfcb69ccc 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -5,13 +5,8 @@ */ package org.elasticsearch.xpack.ccr.action; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; -import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.admin.indices.stats.IndexStats; @@ -21,20 +16,11 @@ import org.elasticsearch.client.Client; import org.elasticsearch.client.FilterClient; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.IndexRoutingTable; -import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.NetworkExceptionHelper; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.common.util.concurrent.CountDown; -import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.index.Index; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.translog.Translog; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; @@ -42,20 +28,10 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ccr.Ccr; -import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction; -import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest; -import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; import java.util.Arrays; import java.util.Map; import java.util.Optional; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.LongConsumer; @@ -64,12 +40,6 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor { - static final long DEFAULT_BATCH_SIZE = 1024; - static final int PROCESSOR_RETRY_LIMIT = 16; - static final int DEFAULT_CONCURRENT_PROCESSORS = 1; - static final long DEFAULT_MAX_TRANSLOG_BYTES= Long.MAX_VALUE; - private static final TimeValue RETRY_TIMEOUT = TimeValue.timeValueMillis(500); - private final Client client; private final ThreadPool threadPool; @@ -100,86 +70,34 @@ public void validate(ShardFollowTask params, ClusterState clusterState) { protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId, PersistentTasksCustomMetaData.PersistentTask taskInProgress, Map headers) { - return new ShardFollowNodeTask(id, type, action, getDescription(taskInProgress), parentTaskId, headers); + ShardFollowTask params = taskInProgress.getParams(); + logger.info("{} Creating node task to track leader shard {}, params [{}]", + params.getFollowShardId(), params.getLeaderShardId(), params); + + final Client leaderClient; + if (params.getLeaderClusterAlias() != null) { + leaderClient = wrapClient(client.getRemoteClusterClient(params.getLeaderClusterAlias()), params); + } else { + leaderClient = wrapClient(client, params); + } + Client followerClient = wrapClient(client, params); + BiConsumer scheduler = + (delay, command) -> threadPool.schedule(delay, Ccr.CCR_THREAD_POOL_NAME, command); + return new ShardFollowNodeTask(id, type, action, getDescription(taskInProgress), parentTaskId, headers, + leaderClient, followerClient, params, scheduler); } @Override protected void nodeOperation(final AllocatedPersistentTask task, final ShardFollowTask params, final PersistentTaskState state) { ShardFollowNodeTask shardFollowNodeTask = (ShardFollowNodeTask) task; - Client leaderClient = wrapClient(params.getLeaderClusterAlias() != null ? - this.client.getRemoteClusterClient(params.getLeaderClusterAlias()) : this.client, params); - Client followerClient = wrapClient(this.client, params); - IndexMetadataVersionChecker imdVersionChecker = new IndexMetadataVersionChecker(params.getLeaderShardId().getIndex(), - params.getFollowShardId().getIndex(), client, leaderClient); - logger.info("[{}] initial leader mapping with follower mapping syncing", params); - imdVersionChecker.updateMapping(1L /* Force update, version is initially 0L */, e -> { - if (e == null) { - logger.info("Starting shard following [{}]", params); - fetchGlobalCheckpoint(followerClient, params.getFollowShardId(), - followGlobalCheckPoint -> { - shardFollowNodeTask.updateProcessedGlobalCheckpoint(followGlobalCheckPoint); - prepare(leaderClient, followerClient, shardFollowNodeTask, params, followGlobalCheckPoint, imdVersionChecker); - }, task::markAsFailed); - } else { - shardFollowNodeTask.markAsFailed(e); - } - }); - } - - void prepare(Client leaderClient, Client followerClient, ShardFollowNodeTask task, ShardFollowTask params, - long followGlobalCheckPoint, - IndexMetadataVersionChecker imdVersionChecker) { - if (task.isRunning() == false) { - // TODO: need better cancellation control - return; - } - - final ShardId leaderShard = params.getLeaderShardId(); - final ShardId followerShard = params.getFollowShardId(); - fetchGlobalCheckpoint(leaderClient, leaderShard, leaderGlobalCheckPoint -> { - // TODO: check if both indices have the same history uuid - if (leaderGlobalCheckPoint == followGlobalCheckPoint) { - logger.debug("{} no write operations to fetch", followerShard); - retry(leaderClient, followerClient, task, params, followGlobalCheckPoint, imdVersionChecker); - } else { - assert followGlobalCheckPoint < leaderGlobalCheckPoint : "followGlobalCheckPoint [" + followGlobalCheckPoint + - "] is not below leaderGlobalCheckPoint [" + leaderGlobalCheckPoint + "]"; - logger.debug("{} fetching write operations, leaderGlobalCheckPoint={}, followGlobalCheckPoint={}", followerShard, - leaderGlobalCheckPoint, followGlobalCheckPoint); - Executor ccrExecutor = threadPool.executor(Ccr.CCR_THREAD_POOL_NAME); - Consumer handler = e -> { - if (e == null) { - task.updateProcessedGlobalCheckpoint(leaderGlobalCheckPoint); - prepare(leaderClient, followerClient, task, params, leaderGlobalCheckPoint, imdVersionChecker); - } else { - task.markAsFailed(e); - } - }; - ChunksCoordinator coordinator = new ChunksCoordinator(followerClient, leaderClient, ccrExecutor, imdVersionChecker, - params.getMaxChunkSize(), params.getNumConcurrentChunks(), params.getProcessorMaxTranslogBytes(), leaderShard, - followerShard, handler); - coordinator.createChucks(followGlobalCheckPoint, leaderGlobalCheckPoint); - coordinator.start(); - } + logger.info("{} Started to track leader shard {}", params.getFollowShardId(), params.getLeaderShardId()); + fetchGlobalCheckpoint(shardFollowNodeTask.leaderClient, params.getLeaderShardId(), leaderGlobalCheckPoint -> { + fetchGlobalCheckpoint(shardFollowNodeTask.followerClient, params.getFollowShardId(), followGlobalCheckPoint -> { + shardFollowNodeTask.start(leaderGlobalCheckPoint, followGlobalCheckPoint); + }, task::markAsFailed); }, task::markAsFailed); } - private void retry(Client leaderClient, Client followerClient, ShardFollowNodeTask task, ShardFollowTask params, - long followGlobalCheckPoint, - IndexMetadataVersionChecker imdVersionChecker) { - threadPool.schedule(RETRY_TIMEOUT, Ccr.CCR_THREAD_POOL_NAME, new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - task.markAsFailed(e); - } - - @Override - protected void doRun() throws Exception { - prepare(leaderClient, followerClient, task, params, followGlobalCheckPoint, imdVersionChecker); - } - }); - } - private void fetchGlobalCheckpoint(Client client, ShardId shardId, LongConsumer handler, Consumer errorHandler) { client.admin().indices().stats(new IndicesStatsRequest().indices(shardId.getIndexName()), ActionListener.wrap(r -> { IndexStats indexStats = r.getIndex(shardId.getIndexName()); @@ -197,229 +115,7 @@ private void fetchGlobalCheckpoint(Client client, ShardId shardId, LongConsumer }, errorHandler)); } - static class ChunksCoordinator { - - private static final Logger LOGGER = Loggers.getLogger(ChunksCoordinator.class); - - private final Client followerClient; - private final Client leaderClient; - private final Executor ccrExecutor; - private final IndexMetadataVersionChecker imdVersionChecker; - - private final long batchSize; - private final int concurrentProcessors; - private final long processorMaxTranslogBytes; - private final ShardId leaderShard; - private final ShardId followerShard; - private final Consumer handler; - - private final CountDown countDown; - private final Queue chunks = new ConcurrentLinkedQueue<>(); - private final AtomicReference failureHolder = new AtomicReference<>(); - - ChunksCoordinator(Client followerClient, Client leaderClient, Executor ccrExecutor, IndexMetadataVersionChecker imdVersionChecker, - long batchSize, int concurrentProcessors, long processorMaxTranslogBytes, ShardId leaderShard, - ShardId followerShard, Consumer handler) { - this.followerClient = followerClient; - this.leaderClient = leaderClient; - this.ccrExecutor = ccrExecutor; - this.imdVersionChecker = imdVersionChecker; - this.batchSize = batchSize; - this.concurrentProcessors = concurrentProcessors; - this.processorMaxTranslogBytes = processorMaxTranslogBytes; - this.leaderShard = leaderShard; - this.followerShard = followerShard; - this.handler = handler; - this.countDown = new CountDown(concurrentProcessors); - } - - /** - * Creates chunks of the specified range, inclusive. - * - * @param from the lower end of the range (inclusive) - * @param to the upper end of the range (inclusive) - */ - void createChucks(final long from, final long to) { - LOGGER.debug("{} Creating chunks for operation range [{}] to [{}]", leaderShard, from, to); - for (long i = from; i < to; i += batchSize) { - long v2 = i + batchSize <= to ? i + batchSize - 1 : to; - chunks.add(new long[]{i, v2}); - } - } - - void start() { - LOGGER.debug("{} Start coordination of [{}] chunks with [{}] concurrent processors", - leaderShard, chunks.size(), concurrentProcessors); - for (int i = 0; i < concurrentProcessors; i++) { - ccrExecutor.execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - assert e != null; - LOGGER.error(() -> new ParameterizedMessage("{} Failure starting processor", leaderShard), e); - postProcessChuck(e); - } - - @Override - protected void doRun() throws Exception { - processNextChunk(); - } - }); - } - } - - void processNextChunk() { - long[] chunk = chunks.poll(); - if (chunk == null) { - postProcessChuck(null); - return; - } - LOGGER.debug("{} Processing chunk [{}/{}]", leaderShard, chunk[0], chunk[1]); - Consumer processorHandler = e -> { - if (e == null) { - LOGGER.debug("{} Successfully processed chunk [{}/{}]", leaderShard, chunk[0], chunk[1]); - processNextChunk(); - } else { - LOGGER.error(() -> new ParameterizedMessage("{} Failure processing chunk [{}/{}]", - leaderShard, chunk[0], chunk[1]), e); - postProcessChuck(e); - } - }; - ChunkProcessor processor = new ChunkProcessor(leaderClient, followerClient, chunks, ccrExecutor, imdVersionChecker, - leaderShard, followerShard, processorHandler); - processor.start(chunk[0], chunk[1], processorMaxTranslogBytes); - } - - void postProcessChuck(Exception e) { - if (failureHolder.compareAndSet(null, e) == false) { - Exception firstFailure = failureHolder.get(); - firstFailure.addSuppressed(e); - } - if (countDown.countDown()) { - handler.accept(failureHolder.get()); - } - } - - Queue getChunks() { - return chunks; - } - - } - - static class ChunkProcessor { - - private final Client leaderClient; - private final Client followerClient; - private final Queue chunks; - private final Executor ccrExecutor; - private final BiConsumer> indexVersionChecker; - - private final ShardId leaderShard; - private final ShardId followerShard; - private final Consumer handler; - final AtomicInteger retryCounter = new AtomicInteger(0); - - ChunkProcessor(Client leaderClient, Client followerClient, Queue chunks, Executor ccrExecutor, - BiConsumer> indexVersionChecker, - ShardId leaderShard, ShardId followerShard, Consumer handler) { - this.leaderClient = leaderClient; - this.followerClient = followerClient; - this.chunks = chunks; - this.ccrExecutor = ccrExecutor; - this.indexVersionChecker = indexVersionChecker; - this.leaderShard = leaderShard; - this.followerShard = followerShard; - this.handler = handler; - } - - void start(final long from, final long to, final long maxTranslogsBytes) { - ShardChangesAction.Request request = new ShardChangesAction.Request(leaderShard); - // Treat -1 as 0, because shard changes api min_seq_no is inclusive and therefore it doesn't allow a negative min_seq_no - // (If no indexing has happened in leader shard then global checkpoint is -1.) - request.setMinSeqNo(Math.max(0, from)); - request.setMaxSeqNo(to); - request.setMaxTranslogsBytes(maxTranslogsBytes); - leaderClient.execute(ShardChangesAction.INSTANCE, request, new ActionListener() { - @Override - public void onResponse(ShardChangesAction.Response response) { - handleResponse(to, response); - } - - @Override - public void onFailure(Exception e) { - assert e != null; - if (shouldRetry(e)) { - if (retryCounter.incrementAndGet() <= PROCESSOR_RETRY_LIMIT) { - start(from, to, maxTranslogsBytes); - } else { - handler.accept(new ElasticsearchException("retrying failed [" + retryCounter.get() + - "] times, aborting...", e)); - } - } else { - handler.accept(e); - } - } - }); - } - - void handleResponse(final long to, final ShardChangesAction.Response response) { - if (response.getOperations().length != 0) { - Translog.Operation lastOp = response.getOperations()[response.getOperations().length - 1]; - boolean maxByteLimitReached = lastOp.seqNo() < to; - if (maxByteLimitReached) { - // add a new entry to the queue for the operations that couldn't be fetched in the current shard changes api call: - chunks.add(new long[]{lastOp.seqNo() + 1, to}); - } - } - ccrExecutor.execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - assert e != null; - handler.accept(e); - } - - @Override - protected void doRun() throws Exception { - indexVersionChecker.accept(response.getIndexMetadataVersion(), e -> { - if (e != null) { - if (shouldRetry(e) && retryCounter.incrementAndGet() <= PROCESSOR_RETRY_LIMIT) { - handleResponse(to, response); - } else { - handler.accept(new ElasticsearchException("retrying failed [" + retryCounter.get() + - "] times, aborting...", e)); - } - return; - } - final BulkShardOperationsRequest request = new BulkShardOperationsRequest(followerShard, response.getOperations()); - followerClient.execute(BulkShardOperationsAction.INSTANCE, request, - new ActionListener() { - @Override - public void onResponse(final BulkShardOperationsResponse bulkShardOperationsResponse) { - handler.accept(null); - } - - @Override - public void onFailure(final Exception e) { - // No retry mechanism here, because if a failure is being redirected to this place it is considered - // non recoverable. - assert e != null; - handler.accept(e); - } - } - ); - }); - } - }); - } - - boolean shouldRetry(Exception e) { - // TODO: What other exceptions should be retried? - return NetworkExceptionHelper.isConnectException(e) || - NetworkExceptionHelper.isCloseConnectionException(e); - } - - } - - static Client wrapClient(Client client, ShardFollowTask shardFollowTask) { + private static Client wrapClient(Client client, ShardFollowTask shardFollowTask) { if (shardFollowTask.getHeaders().isEmpty()) { return client; } else { @@ -446,76 +142,4 @@ private static ThreadContext.StoredContext stashWithHeaders(ThreadContext thread return storedContext; } - static final class IndexMetadataVersionChecker implements BiConsumer> { - - private static final Logger LOGGER = Loggers.getLogger(IndexMetadataVersionChecker.class); - - private final Client followClient; - private final Client leaderClient; - private final Index leaderIndex; - private final Index followIndex; - private final AtomicLong currentIndexMetadataVersion; - private final Semaphore updateMappingSemaphore = new Semaphore(1); - - IndexMetadataVersionChecker(Index leaderIndex, Index followIndex, Client followClient, Client leaderClient) { - this.followClient = followClient; - this.leaderIndex = leaderIndex; - this.followIndex = followIndex; - this.leaderClient = leaderClient; - this.currentIndexMetadataVersion = new AtomicLong(); - } - - public void accept(Long minimumRequiredIndexMetadataVersion, Consumer handler) { - if (currentIndexMetadataVersion.get() >= minimumRequiredIndexMetadataVersion) { - LOGGER.debug("Current index metadata version [{}] is higher or equal than minimum required index metadata version [{}]", - currentIndexMetadataVersion.get(), minimumRequiredIndexMetadataVersion); - handler.accept(null); - } else { - updateMapping(minimumRequiredIndexMetadataVersion, handler); - } - } - - void updateMapping(long minimumRequiredIndexMetadataVersion, Consumer handler) { - try { - updateMappingSemaphore.acquire(); - } catch (InterruptedException e) { - handler.accept(e); - return; - } - if (currentIndexMetadataVersion.get() >= minimumRequiredIndexMetadataVersion) { - updateMappingSemaphore.release(); - LOGGER.debug("Current index metadata version [{}] is higher or equal than minimum required index metadata version [{}]", - currentIndexMetadataVersion.get(), minimumRequiredIndexMetadataVersion); - handler.accept(null); - return; - } - - ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); - clusterStateRequest.clear(); - clusterStateRequest.metaData(true); - clusterStateRequest.indices(leaderIndex.getName()); - - leaderClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> { - IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex); - assert indexMetaData.getMappings().size() == 1; - MappingMetaData mappingMetaData = indexMetaData.getMappings().iterator().next().value; - - PutMappingRequest putMappingRequest = new PutMappingRequest(followIndex.getName()); - putMappingRequest.type(mappingMetaData.type()); - putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON); - followClient.admin().indices().putMapping(putMappingRequest, ActionListener.wrap(putMappingResponse -> { - currentIndexMetadataVersion.set(indexMetaData.getVersion()); - updateMappingSemaphore.release(); - handler.accept(null); - }, e -> { - updateMappingSemaphore.release(); - handler.accept(e); - })); - }, e -> { - updateMappingSemaphore.release(); - handler.accept(e); - })); - } - } - } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsResponse.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsResponse.java index 62612e4bb4b3a..c4d26d96905b1 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsResponse.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsResponse.java @@ -7,12 +7,39 @@ import org.elasticsearch.action.support.WriteResponse; import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; public final class BulkShardOperationsResponse extends ReplicationResponse implements WriteResponse { + private long localCheckpoint; + + BulkShardOperationsResponse(long localCheckPoint) { + this.localCheckpoint = localCheckPoint; + } + + BulkShardOperationsResponse() { + } + + public long getLocalCheckpoint() { + return localCheckpoint; + } + @Override public void setForcedRefresh(final boolean forcedRefresh) { + } + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + localCheckpoint = in.readZLong(); } + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeZLong(localCheckpoint); + } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index 81cbf042037c1..9048d121a0e96 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -103,7 +103,8 @@ static WritePrimaryResult(replicaRequest, new BulkShardOperationsResponse(), location, null, primary, logger); + long localCheckPoint = primary.getLocalCheckpoint(); + return new WritePrimaryResult<>(replicaRequest, new BulkShardOperationsResponse(localCheckPoint), location, null, primary, logger); } @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowIndexAction.java index 235308f902926..cc1a82f5787f5 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowIndexAction.java @@ -40,16 +40,22 @@ static Request createRequest(RestRequest restRequest) { Request request = new Request(); request.setLeaderIndex(restRequest.param("leader_index")); request.setFollowIndex(restRequest.param("index")); - if (restRequest.hasParam(ShardFollowTask.MAX_CHUNK_SIZE.getPreferredName())) { - request.setBatchSize(Long.valueOf(restRequest.param(ShardFollowTask.MAX_CHUNK_SIZE.getPreferredName()))); + if (restRequest.hasParam(ShardFollowTask.MAX_READ_SIZE.getPreferredName())) { + request.setMaxReadSize(Integer.valueOf(restRequest.param(ShardFollowTask.MAX_READ_SIZE.getPreferredName()))); } - if (restRequest.hasParam(ShardFollowTask.NUM_CONCURRENT_CHUNKS.getPreferredName())) { - request.setConcurrentProcessors(Integer.valueOf(restRequest.param(ShardFollowTask.NUM_CONCURRENT_CHUNKS.getPreferredName()))); + if (restRequest.hasParam(ShardFollowTask.MAX_CONCURRENT_READS.getPreferredName())) { + request.setMaxConcurrentReads(Integer.valueOf(restRequest.param(ShardFollowTask.MAX_CONCURRENT_READS.getPreferredName()))); } - if (restRequest.hasParam(ShardFollowTask.PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST.getPreferredName())) { - long value = Long.valueOf(restRequest.param(ShardFollowTask.PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST.getPreferredName())); + if (restRequest.hasParam(ShardFollowTask.MAX_TRANSLOG_BYTES_PER_REQUEST.getPreferredName())) { + long value = Long.valueOf(restRequest.param(ShardFollowTask.MAX_TRANSLOG_BYTES_PER_REQUEST.getPreferredName())); request.setProcessorMaxTranslogBytes(value); } + if (restRequest.hasParam(ShardFollowTask.MAX_WRITE_SIZE.getPreferredName())) { + request.setMaxWriteSize(Integer.valueOf(restRequest.param(ShardFollowTask.MAX_WRITE_SIZE.getPreferredName()))); + } + if (restRequest.hasParam(ShardFollowTask.MAX_CONCURRENT_WRITES.getPreferredName())) { + request.setMaxConcurrentWrites(Integer.valueOf(restRequest.param(ShardFollowTask.MAX_CONCURRENT_WRITES.getPreferredName()))); + } return request; } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java index ba9855b58736e..7c58be62e5c5e 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java @@ -9,13 +9,20 @@ import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.analysis.common.CommonAnalysisPlugin; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.support.XContentMapValues; @@ -24,6 +31,7 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.MockHttpTransport; @@ -37,6 +45,7 @@ import org.elasticsearch.xpack.core.XPackSettings; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -45,10 +54,14 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import static java.util.Collections.singletonMap; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -90,8 +103,8 @@ protected Collection> transportClientPlugins() { // this emulates what the CCR persistent task will do for pulling public void testGetOperationsBasedOnGlobalSequenceId() throws Exception { client().admin().indices().prepareCreate("index") - .setSettings(Settings.builder().put("index.number_of_shards", 1)) - .get(); + .setSettings(Settings.builder().put("index.number_of_shards", 1)) + .get(); client().prepareIndex("index", "doc", "1").setSource("{}", XContentType.JSON).get(); client().prepareIndex("index", "doc", "2").setSource("{}", XContentType.JSON).get(); @@ -147,9 +160,9 @@ public void testGetOperationsBasedOnGlobalSequenceId() throws Exception { public void testFollowIndex() throws Exception { final int numberOfPrimaryShards = randomIntBetween(1, 3); final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, - Collections.singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); - ensureGreen("index1"); + ensureYellow("index1"); final FollowIndexAction.Request followRequest = new FollowIndexAction.Request(); followRequest.setLeaderIndex("index1"); @@ -209,9 +222,9 @@ public void testFollowIndex() throws Exception { public void testSyncMappings() throws Exception { final String leaderIndexSettings = getIndexSettings(2, - Collections.singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); - ensureGreen("index1"); + ensureYellow("index1"); final FollowIndexAction.Request followRequest = new FollowIndexAction.Request(); followRequest.setLeaderIndex("index1"); @@ -229,7 +242,7 @@ public void testSyncMappings() throws Exception { assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(firstBatchNumDocs))); MappingMetaData mappingMetaData = client().admin().indices().prepareGetMappings("index2").get().getMappings() - .get("index2").get("doc"); + .get("index2").get("doc"); assertThat(XContentMapValues.extractValue("properties.f.type", mappingMetaData.sourceAsMap()), equalTo("integer")); assertThat(XContentMapValues.extractValue("properties.k", mappingMetaData.sourceAsMap()), nullValue()); @@ -240,41 +253,136 @@ public void testSyncMappings() throws Exception { } assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, - equalTo(firstBatchNumDocs + secondBatchNumDocs))); + equalTo(firstBatchNumDocs + secondBatchNumDocs))); mappingMetaData = client().admin().indices().prepareGetMappings("index2").get().getMappings() - .get("index2").get("doc"); + .get("index2").get("doc"); assertThat(XContentMapValues.extractValue("properties.f.type", mappingMetaData.sourceAsMap()), equalTo("integer")); assertThat(XContentMapValues.extractValue("properties.k.type", mappingMetaData.sourceAsMap()), equalTo("long")); + unfollowIndex("index2"); + } - final UnfollowIndexAction.Request unfollowRequest = new UnfollowIndexAction.Request(); - unfollowRequest.setFollowIndex("index2"); - client().execute(UnfollowIndexAction.INSTANCE, unfollowRequest).get(); + public void testFollowIndex_backlog() throws Exception { + String leaderIndexSettings = getIndexSettings(3, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); + BulkProcessor.Listener listener = new BulkProcessor.Listener() { + @Override + public void beforeBulk(long executionId, BulkRequest request) {} - assertBusy(() -> { - final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - assertThat(tasks.tasks().size(), equalTo(0)); + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {} - ListTasksRequest listTasksRequest = new ListTasksRequest(); - listTasksRequest.setDetailed(true); - ListTasksResponse listTasksResponse = client().admin().cluster().listTasks(listTasksRequest).get(); - int numNodeTasks = 0; - for (TaskInfo taskInfo : listTasksResponse.getTasks()) { - if (taskInfo.getAction().startsWith(ListTasksAction.NAME) == false) { - numNodeTasks++; + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) {} + }; + BulkProcessor bulkProcessor = BulkProcessor.builder(client(), listener) + .setBulkActions(100) + .setConcurrentRequests(4) + .build(); + AtomicBoolean run = new AtomicBoolean(true); + Thread thread = new Thread(() -> { + int counter = 0; + while (run.get()) { + final String source = String.format(Locale.ROOT, "{\"f\":%d}", counter++); + IndexRequest indexRequest = new IndexRequest("index1", "doc") + .source(source, XContentType.JSON) + .timeout(TimeValue.timeValueSeconds(1)); + bulkProcessor.add(indexRequest); + } + }); + thread.start(); + + // Waiting for some document being index before following the index: + int maxReadSize = randomIntBetween(128, 2048); + long numDocsIndexed = Math.min(3000 * 2, randomLongBetween(maxReadSize, maxReadSize * 10)); + atLeastDocsIndexed("index1", numDocsIndexed / 3); + + final FollowIndexAction.Request followRequest = new FollowIndexAction.Request(); + followRequest.setLeaderIndex("index1"); + followRequest.setFollowIndex("index2"); + followRequest.setMaxReadSize(maxReadSize); + followRequest.setMaxConcurrentReads(randomIntBetween(2, 10)); + followRequest.setMaxConcurrentWrites(randomIntBetween(2, 10)); + CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(); + createAndFollowRequest.setFollowRequest(followRequest); + client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get(); + + atLeastDocsIndexed("index1", numDocsIndexed); + run.set(false); + thread.join(); + assertThat(bulkProcessor.awaitClose(1L, TimeUnit.MINUTES), is(true)); + + assertSameDocCount("index1", "index2"); + unfollowIndex("index2"); + } + + public void testFollowIndexAndCloseNode() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(3); + String leaderIndexSettings = getIndexSettings(3, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); + + String followerIndexSettings = getIndexSettings(3, singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true")); + assertAcked(client().admin().indices().prepareCreate("index2").setSource(followerIndexSettings, XContentType.JSON)); + ensureGreen("index1", "index2"); + + AtomicBoolean run = new AtomicBoolean(true); + Thread thread = new Thread(() -> { + int counter = 0; + while (run.get()) { + final String source = String.format(Locale.ROOT, "{\"f\":%d}", counter++); + try { + client().prepareIndex("index1", "doc") + .setSource(source, XContentType.JSON) + .setTimeout(TimeValue.timeValueSeconds(1)) + .get(); + } catch (Exception e) { + logger.error("Error while indexing into leader index", e); } } - assertThat(numNodeTasks, equalTo(0)); }); + thread.start(); + + final FollowIndexAction.Request followRequest = new FollowIndexAction.Request(); + followRequest.setLeaderIndex("index1"); + followRequest.setFollowIndex("index2"); + followRequest.setMaxReadSize(randomIntBetween(32, 2048)); + followRequest.setMaxConcurrentReads(randomIntBetween(2, 10)); + client().execute(FollowIndexAction.INSTANCE, followRequest).get(); + + long maxNumDocsReplicated = Math.min(3000, randomLongBetween(followRequest.getMaxReadSize(), followRequest.getMaxReadSize() * 10)); + long minNumDocsReplicated = maxNumDocsReplicated / 3L; + logger.info("waiting for at least [{}] documents to be indexed and then stop a random data node", minNumDocsReplicated); + awaitBusy(() -> { + SearchRequest request = new SearchRequest("index2"); + request.source(new SearchSourceBuilder().size(0)); + SearchResponse response = client().search(request).actionGet(); + if (response.getHits().getTotalHits() >= minNumDocsReplicated) { + try { + internalCluster().stopRandomNonMasterNode(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return true; + } else { + return false; + } + }, 30, TimeUnit.SECONDS); + + logger.info("waiting for at least [{}] documents to be indexed", maxNumDocsReplicated); + atLeastDocsIndexed("index2", maxNumDocsReplicated); + run.set(false); + thread.join(); + + assertSameDocCount("index1", "index2"); + unfollowIndex("index2"); } public void testFollowIndexWithNestedField() throws Exception { final String leaderIndexSettings = - getIndexSettingsWithNestedMapping(1, Collections.singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + getIndexSettingsWithNestedMapping(1, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); final String followerIndexSettings = - getIndexSettingsWithNestedMapping(1, Collections.singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true")); + getIndexSettingsWithNestedMapping(1, singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true")); assertAcked(client().admin().indices().prepareCreate("index2").setSource(followerIndexSettings, XContentType.JSON)); ensureGreen("index1", "index2"); @@ -311,16 +419,7 @@ public void testFollowIndexWithNestedField() throws Exception { equalTo(Collections.singletonList(value))); }); } - - final UnfollowIndexAction.Request unfollowRequest = new UnfollowIndexAction.Request(); - unfollowRequest.setFollowIndex("index2"); - client().execute(UnfollowIndexAction.INSTANCE, unfollowRequest).get(); - - assertBusy(() -> { - final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - assertThat(tasks.tasks().size(), equalTo(0)); - }); + unfollowIndex("index2"); } public void testUnfollowNonExistingIndex() { @@ -347,6 +446,43 @@ public void testFollowNonExistentIndex() throws Exception { expectThrows(IllegalArgumentException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest).actionGet()); } + public void testFollowIndex_lowMaxTranslogBytes() throws Exception { + final String leaderIndexSettings = getIndexSettings(1, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); + ensureYellow("index1"); + + final int numDocs = 1024; + logger.info("Indexing [{}] docs", numDocs); + for (int i = 0; i < numDocs; i++) { + final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); + client().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); + } + + final FollowIndexAction.Request followRequest = new FollowIndexAction.Request(); + followRequest.setProcessorMaxTranslogBytes(1024); + followRequest.setLeaderIndex("index1"); + followRequest.setFollowIndex("index2"); + + final CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(); + createAndFollowRequest.setFollowRequest(followRequest); + client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get(); + + final Map firstBatchNumDocsPerShard = new HashMap<>(); + final ShardStats[] firstBatchShardStats = client().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); + for (final ShardStats shardStats : firstBatchShardStats) { + if (shardStats.getShardRouting().primary()) { + long value = shardStats.getStats().getIndexing().getTotal().getIndexCount() - 1; + firstBatchNumDocsPerShard.put(shardStats.getShardRouting().shardId(), value); + } + } + + assertBusy(assertTask(1, firstBatchNumDocsPerShard)); + for (int i = 0; i < numDocs; i++) { + assertBusy(assertExpectedDocumentRunnable(i)); + } + unfollowIndex("index2"); + } + private CheckedRunnable assertTask(final int numberOfPrimaryShards, final Map numDocsPerShard) { return () -> { final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); @@ -362,7 +498,7 @@ private CheckedRunnable assertTask(final int numberOfPrimaryShards, f List taskInfos = listTasksResponse.getTasks(); assertThat(taskInfos.size(), equalTo(numberOfPrimaryShards)); Collection> shardFollowTasks = - taskMetadata.findTasks(ShardFollowTask.NAME, Objects::nonNull); + taskMetadata.findTasks(ShardFollowTask.NAME, Objects::nonNull); for (PersistentTasksCustomMetaData.PersistentTask shardFollowTask : shardFollowTasks) { final ShardFollowTask shardFollowTaskParams = (ShardFollowTask) shardFollowTask.getParams(); TaskInfo taskInfo = null; @@ -376,9 +512,9 @@ private CheckedRunnable assertTask(final int numberOfPrimaryShards, f assertThat(taskInfo, notNullValue()); ShardFollowNodeTask.Status status = (ShardFollowNodeTask.Status) taskInfo.getStatus(); assertThat(status, notNullValue()); - assertThat( - status.getProcessedGlobalCheckpoint(), - equalTo(numDocsPerShard.get(shardFollowTaskParams.getLeaderShardId()))); + assertThat("incorrect global checkpoint " + shardFollowTaskParams, + status.getProcessedGlobalCheckpoint(), + equalTo(numDocsPerShard.get(shardFollowTaskParams.getLeaderShardId()))); } }; } @@ -422,6 +558,7 @@ private String getIndexSettings(final int numberOfPrimaryShards, final Map additionalSetting : additionalIndexSettings.entrySet()) { builder.field(additionalSetting.getKey(), additionalSetting.getValue()); } @@ -502,4 +639,29 @@ private String getIndexSettingsWithNestedMapping(final int numberOfPrimaryShards } return settings; } + + private void atLeastDocsIndexed(String index, long numDocsReplicated) throws InterruptedException { + logger.info("waiting for at least [{}] documents to be indexed into index [{}]", numDocsReplicated, index); + awaitBusy(() -> { + refresh(index); + SearchRequest request = new SearchRequest(index); + request.source(new SearchSourceBuilder().size(0)); + SearchResponse response = client().search(request).actionGet(); + return response.getHits().getTotalHits() >= numDocsReplicated; + }, 30, TimeUnit.SECONDS); + } + + private void assertSameDocCount(String index1, String index2) throws Exception { + refresh(index1); + SearchRequest request1 = new SearchRequest(index1); + request1.source(new SearchSourceBuilder().size(0)); + SearchResponse response1 = client().search(request1).actionGet(); + assertBusy(() -> { + refresh(index2); + SearchRequest request2 = new SearchRequest(index2); + request2.source(new SearchSourceBuilder().size(0)); + SearchResponse response2 = client().search(request2).actionGet(); + assertThat(response2.getHits().getTotalHits(), equalTo(response1.getHits().getTotalHits())); + }); + } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java deleted file mode 100644 index 9af0d93e9e2bc..0000000000000 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java +++ /dev/null @@ -1,407 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ccr.action; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.client.AdminClient; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.ClusterAdminClient; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.translog.Translog; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor.ChunkProcessor; -import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor.ChunksCoordinator; -import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor.IndexMetadataVersionChecker; -import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction; -import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest; -import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; - -import java.net.ConnectException; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; -import java.util.function.Consumer; - -import static org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor.PROCESSOR_RETRY_LIMIT; -import static org.hamcrest.CoreMatchers.notNullValue; -import static org.hamcrest.CoreMatchers.nullValue; -import static org.hamcrest.CoreMatchers.sameInstance; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.same; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -public class ChunksCoordinatorTests extends ESTestCase { - - public void testCreateChunks() { - Client client = mock(Client.class); - Executor ccrExecutor = Runnable::run; - ShardId leaderShardId = new ShardId("index1", "index1", 0); - ShardId followShardId = new ShardId("index2", "index1", 0); - - IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(), - followShardId.getIndex(), client, client); - ChunksCoordinator coordinator = new ChunksCoordinator(client, client, ccrExecutor, checker, 1024, 1, - Long.MAX_VALUE, leaderShardId, followShardId, e -> {}); - coordinator.createChucks(0, 1023); - List result = new ArrayList<>(coordinator.getChunks()); - assertThat(result.size(), equalTo(1)); - assertThat(result.get(0)[0], equalTo(0L)); - assertThat(result.get(0)[1], equalTo(1023L)); - - coordinator.getChunks().clear(); - coordinator.createChucks(0, 2047); - result = new ArrayList<>(coordinator.getChunks()); - assertThat(result.size(), equalTo(2)); - assertThat(result.get(0)[0], equalTo(0L)); - assertThat(result.get(0)[1], equalTo(1023L)); - assertThat(result.get(1)[0], equalTo(1024L)); - assertThat(result.get(1)[1], equalTo(2047L)); - - coordinator.getChunks().clear(); - coordinator.createChucks(0, 4095); - result = new ArrayList<>(coordinator.getChunks()); - assertThat(result.size(), equalTo(4)); - assertThat(result.get(0)[0], equalTo(0L)); - assertThat(result.get(0)[1], equalTo(1023L)); - assertThat(result.get(1)[0], equalTo(1024L)); - assertThat(result.get(1)[1], equalTo(2047L)); - assertThat(result.get(2)[0], equalTo(2048L)); - assertThat(result.get(2)[1], equalTo(3071L)); - assertThat(result.get(3)[0], equalTo(3072L)); - assertThat(result.get(3)[1], equalTo(4095L)); - - coordinator.getChunks().clear(); - coordinator.createChucks(4096, 8196); - result = new ArrayList<>(coordinator.getChunks()); - assertThat(result.size(), equalTo(5)); - assertThat(result.get(0)[0], equalTo(4096L)); - assertThat(result.get(0)[1], equalTo(5119L)); - assertThat(result.get(1)[0], equalTo(5120L)); - assertThat(result.get(1)[1], equalTo(6143L)); - assertThat(result.get(2)[0], equalTo(6144L)); - assertThat(result.get(2)[1], equalTo(7167L)); - assertThat(result.get(3)[0], equalTo(7168L)); - assertThat(result.get(3)[1], equalTo(8191L)); - assertThat(result.get(4)[0], equalTo(8192L)); - assertThat(result.get(4)[1], equalTo(8196L)); - } - - public void testCoordinator() throws Exception { - Client client = createClientMock(); - - mockShardChangesApiCall(client); - mockBulkShardOperationsApiCall(client); - Executor ccrExecutor = Runnable::run; - ShardId leaderShardId = new ShardId("index1", "index1", 0); - ShardId followShardId = new ShardId("index2", "index1", 0); - - Consumer handler = e -> assertThat(e, nullValue()); - int concurrentProcessors = randomIntBetween(1, 4); - int batchSize = randomIntBetween(1, 1000); - IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(), - followShardId.getIndex(), client, client); - ChunksCoordinator coordinator = new ChunksCoordinator(client, client, ccrExecutor, checker, batchSize, - concurrentProcessors, Long.MAX_VALUE, leaderShardId, followShardId, handler); - - int numberOfOps = randomIntBetween(batchSize, batchSize * 20); - long from = randomInt(1000); - long to = from + numberOfOps - 1; - coordinator.createChucks(from, to); - int expectedNumberOfChunks = numberOfOps / batchSize; - if (numberOfOps % batchSize > 0) { - expectedNumberOfChunks++; - } - assertThat(coordinator.getChunks().size(), equalTo(expectedNumberOfChunks)); - - coordinator.start(); - assertThat(coordinator.getChunks().size(), equalTo(0)); - verify(client, times(expectedNumberOfChunks)).execute(same(ShardChangesAction.INSTANCE), - any(ShardChangesAction.Request.class), any()); - verify(client, times(expectedNumberOfChunks)).execute(same(BulkShardOperationsAction.INSTANCE), - any(BulkShardOperationsRequest.class), any()); - } - - public void testCoordinator_failure() throws Exception { - Exception expectedException = new RuntimeException("throw me"); - Client client = createClientMock(); - boolean shardChangesActionApiCallFailed; - if (randomBoolean()) { - shardChangesActionApiCallFailed = true; - doThrow(expectedException).when(client).execute(same(ShardChangesAction.INSTANCE), - any(ShardChangesAction.Request.class), any()); - } else { - shardChangesActionApiCallFailed = false; - mockShardChangesApiCall(client); - doThrow(expectedException).when(client).execute(same(BulkShardOperationsAction.INSTANCE), - any(BulkShardOperationsRequest.class), any()); - } - Executor ccrExecutor = Runnable::run; - ShardId leaderShardId = new ShardId("index1", "index1", 0); - ShardId followShardId = new ShardId("index2", "index1", 0); - - Consumer handler = e -> { - assertThat(e, notNullValue()); - assertThat(e, sameInstance(expectedException)); - }; - IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(), - followShardId.getIndex(), client, client); - ChunksCoordinator coordinator = new ChunksCoordinator(client, client, ccrExecutor, checker, 10, 1, Long.MAX_VALUE, - leaderShardId, followShardId, handler); - coordinator.createChucks(0, 19); - assertThat(coordinator.getChunks().size(), equalTo(2)); - - coordinator.start(); - assertThat(coordinator.getChunks().size(), equalTo(1)); - verify(client, times(1)).execute(same(ShardChangesAction.INSTANCE), any(ShardChangesAction.Request.class), - any()); - verify(client, times(shardChangesActionApiCallFailed ? 0 : 1)).execute(same(BulkShardOperationsAction.INSTANCE), - any(BulkShardOperationsRequest.class), any()); - } - - public void testCoordinator_concurrent() throws Exception { - Client client = createClientMock(); - mockShardChangesApiCall(client); - mockBulkShardOperationsApiCall(client); - Executor ccrExecutor = command -> new Thread(command).start(); - ShardId leaderShardId = new ShardId("index1", "index1", 0); - ShardId followShardId = new ShardId("index2", "index1", 0); - - AtomicBoolean calledOnceChecker = new AtomicBoolean(false); - AtomicReference failureHolder = new AtomicReference<>(); - CountDownLatch latch = new CountDownLatch(1); - Consumer handler = e -> { - if (failureHolder.compareAndSet(null, e) == false) { - // This handler should only be called once, irregardless of the number of concurrent processors - calledOnceChecker.set(true); - } - latch.countDown(); - }; - IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(), - followShardId.getIndex(), client, client); - ChunksCoordinator coordinator = new ChunksCoordinator(client, client, ccrExecutor, checker, 1000, 4, Long.MAX_VALUE, - leaderShardId, followShardId, handler); - coordinator.createChucks(0, 999999); - assertThat(coordinator.getChunks().size(), equalTo(1000)); - - coordinator.start(); - latch.await(); - assertThat(coordinator.getChunks().size(), equalTo(0)); - verify(client, times(1000)).execute(same(ShardChangesAction.INSTANCE), any(ShardChangesAction.Request.class), any()); - verify(client, times(1000)).execute(same(BulkShardOperationsAction.INSTANCE), any(BulkShardOperationsRequest.class), any()); - assertThat(calledOnceChecker.get(), is(false)); - } - - public void testChunkProcessor() { - Client client = createClientMock(); - Queue chunks = new LinkedList<>(); - mockShardChangesApiCall(client); - mockBulkShardOperationsApiCall(client); - Executor ccrExecutor = Runnable::run; - ShardId leaderShardId = new ShardId("index1", "index1", 0); - ShardId followShardId = new ShardId("index2", "index1", 0); - IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(), - followShardId.getIndex(), client, client); - - boolean[] invoked = new boolean[1]; - Exception[] exception = new Exception[1]; - Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; - ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, checker, leaderShardId, - followShardId, handler); - chunkProcessor.start(0, 10, Long.MAX_VALUE); - assertThat(invoked[0], is(true)); - assertThat(exception[0], nullValue()); - } - - public void testChunkProcessorRetry() { - Client client = createClientMock(); - Queue chunks = new LinkedList<>(); - mockBulkShardOperationsApiCall(client); - int testRetryLimit = randomIntBetween(1, PROCESSOR_RETRY_LIMIT - 1); - mockShardCangesApiCallWithRetry(client, testRetryLimit, new ConnectException("connection exception")); - - Executor ccrExecutor = Runnable::run; - ShardId leaderShardId = new ShardId("index1", "index1", 0); - ShardId followShardId = new ShardId("index2", "index1", 0); - IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(), - followShardId.getIndex(), client, client); - - boolean[] invoked = new boolean[1]; - Exception[] exception = new Exception[1]; - Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; - ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, checker, leaderShardId, - followShardId, handler); - chunkProcessor.start(0, 10, Long.MAX_VALUE); - assertThat(invoked[0], is(true)); - assertThat(exception[0], nullValue()); - assertThat(chunkProcessor.retryCounter.get(), equalTo(testRetryLimit + 1)); - } - - public void testChunkProcessorRetryTooManyTimes() { - Client client = createClientMock(); - Queue chunks = new LinkedList<>(); - mockBulkShardOperationsApiCall(client); - int testRetryLimit = PROCESSOR_RETRY_LIMIT + 1; - mockShardCangesApiCallWithRetry(client, testRetryLimit, new ConnectException("connection exception")); - - Executor ccrExecutor = Runnable::run; - ShardId leaderShardId = new ShardId("index1", "index1", 0); - ShardId followShardId = new ShardId("index2", "index1", 0); - IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(), - followShardId.getIndex(), client, client); - - boolean[] invoked = new boolean[1]; - Exception[] exception = new Exception[1]; - Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; - ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, checker, leaderShardId, - followShardId, handler); - chunkProcessor.start(0, 10, Long.MAX_VALUE); - assertThat(invoked[0], is(true)); - assertThat(exception[0], notNullValue()); - assertThat(exception[0].getMessage(), equalTo("retrying failed [17] times, aborting...")); - assertThat(exception[0].getCause().getMessage(), equalTo("connection exception")); - assertThat(chunkProcessor.retryCounter.get(), equalTo(testRetryLimit)); - } - - public void testChunkProcessorNoneRetryableError() { - Client client = createClientMock(); - Queue chunks = new LinkedList<>(); - mockBulkShardOperationsApiCall(client); - mockShardCangesApiCallWithRetry(client, 3, new RuntimeException("unexpected")); - - Executor ccrExecutor = Runnable::run; - ShardId leaderShardId = new ShardId("index1", "index1", 0); - ShardId followShardId = new ShardId("index2", "index1", 0); - IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(), - followShardId.getIndex(), client, client); - - boolean[] invoked = new boolean[1]; - Exception[] exception = new Exception[1]; - Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; - ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, checker, leaderShardId, - followShardId, handler); - chunkProcessor.start(0, 10, Long.MAX_VALUE); - assertThat(invoked[0], is(true)); - assertThat(exception[0], notNullValue()); - assertThat(exception[0].getMessage(), equalTo("unexpected")); - assertThat(chunkProcessor.retryCounter.get(), equalTo(0)); - } - - public void testChunkProcessorExceedMaxTranslogsBytes() { - long from = 0; - long to = 20; - long actualTo = 10; - Client client = createClientMock(); - Queue chunks = new LinkedList<>(); - doAnswer(invocation -> { - Object[] args = invocation.getArguments(); - assert args.length == 3; - @SuppressWarnings("unchecked") - ActionListener listener = (ActionListener) args[2]; - - List operations = new ArrayList<>(); - for (int i = 0; i <= actualTo; i++) { - operations.add(new Translog.NoOp(i, 1, "test")); - } - listener.onResponse(new ShardChangesAction.Response(1L, operations.toArray(new Translog.Operation[0]))); - return null; - }).when(client).execute(same(ShardChangesAction.INSTANCE), any(ShardChangesAction.Request.class), any()); - - mockBulkShardOperationsApiCall(client); - Executor ccrExecutor = Runnable::run; - ShardId leaderShardId = new ShardId("index1", "index1", 0); - ShardId followShardId = new ShardId("index2", "index1", 0); - IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(), - followShardId.getIndex(), client, client); - - boolean[] invoked = new boolean[1]; - Exception[] exception = new Exception[1]; - Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; - BiConsumer> versionChecker = (indexVersiuon, consumer) -> consumer.accept(null); - ChunkProcessor chunkProcessor = - new ChunkProcessor(client, client, chunks, ccrExecutor, versionChecker, leaderShardId, followShardId, handler); - chunkProcessor.start(from, to, Long.MAX_VALUE); - assertThat(invoked[0], is(true)); - assertThat(exception[0], nullValue()); - assertThat(chunks.size(), equalTo(1)); - assertThat(chunks.peek()[0], equalTo(11L)); - assertThat(chunks.peek()[1], equalTo(20L)); - } - - private Client createClientMock() { - Client client = mock(Client.class); - ClusterAdminClient clusterAdminClient = mock(ClusterAdminClient.class); - AdminClient adminClient = mock(AdminClient.class); - when(adminClient.cluster()).thenReturn(clusterAdminClient); - when(client.admin()).thenReturn(adminClient); - return client; - } - - private void mockShardCangesApiCallWithRetry(Client client, int testRetryLimit, Exception e) { - int[] retryCounter = new int[1]; - doAnswer(invocation -> { - Object[] args = invocation.getArguments(); - assert args.length == 3; - ShardChangesAction.Request request = (ShardChangesAction.Request) args[1]; - @SuppressWarnings("unchecked") - ActionListener listener = (ActionListener) args[2]; - if (retryCounter[0]++ <= testRetryLimit) { - listener.onFailure(e); - } else { - long delta = request.getMaxSeqNo() - request.getMinSeqNo(); - Translog.Operation[] operations = new Translog.Operation[(int) delta]; - for (int i = 0; i < operations.length; i++) { - operations[i] = new Translog.NoOp(request.getMinSeqNo() + i, 1, "test"); - } - ShardChangesAction.Response response = new ShardChangesAction.Response(0L, operations); - listener.onResponse(response); - } - return null; - }).when(client).execute(same(ShardChangesAction.INSTANCE), any(ShardChangesAction.Request.class), any()); - } - - private void mockShardChangesApiCall(Client client) { - doAnswer(invocation -> { - Object[] args = invocation.getArguments(); - assert args.length == 3; - ShardChangesAction.Request request = (ShardChangesAction.Request) args[1]; - @SuppressWarnings("unchecked") - ActionListener listener = (ActionListener) args[2]; - - List operations = new ArrayList<>(); - for (long i = request.getMinSeqNo(); i <= request.getMaxSeqNo(); i++) { - operations.add(new Translog.NoOp(request.getMinSeqNo() + i, 1, "test")); - } - ShardChangesAction.Response response = new ShardChangesAction.Response(0L, operations.toArray(new Translog.Operation[0])); - listener.onResponse(response); - return null; - }).when(client).execute(same(ShardChangesAction.INSTANCE), any(ShardChangesAction.Request.class), any()); - } - - private void mockBulkShardOperationsApiCall(Client client) { - doAnswer(invocation -> { - Object[] args = invocation.getArguments(); - assert args.length == 3; - @SuppressWarnings("unchecked") - ActionListener listener = (ActionListener) args[2]; - listener.onResponse(new BulkShardOperationsResponse()); - return null; - }).when(client).execute(same(BulkShardOperationsAction.INSTANCE), any(BulkShardOperationsRequest.class), any()); - } - -} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexRequestTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexRequestTests.java index 9a0557b369a77..73c4a192a63f5 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexRequestTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexRequestTests.java @@ -23,9 +23,13 @@ static FollowIndexAction.Request createTestRequest() { FollowIndexAction.Request request = new FollowIndexAction.Request(); request.setLeaderIndex(randomAlphaOfLength(4)); request.setFollowIndex(randomAlphaOfLength(4)); - request.setBatchSize(randomNonNegativeLong()); - request.setConcurrentProcessors(randomIntBetween(0, Integer.MAX_VALUE)); + request.setMaxReadSize(randomIntBetween(1, Integer.MAX_VALUE)); + request.setMaxConcurrentReads(randomIntBetween(1, Integer.MAX_VALUE)); request.setProcessorMaxTranslogBytes(randomNonNegativeLong()); + request.setMaxWriteSize(randomIntBetween(1, Integer.MAX_VALUE)); + request.setMaxConcurrentWrites(randomIntBetween(1, Integer.MAX_VALUE)); + request.setMaxConcurrentWrites(randomIntBetween(1, Integer.MAX_VALUE)); + request.setMaxBufferSize(randomIntBetween(1, Integer.MAX_VALUE)); return request; } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java index 7e06ce120a86a..56440dc53f57b 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java @@ -48,7 +48,7 @@ public void testGetOperationsBetween() throws Exception { IndexShard indexShard = indexService.getShard(0); for (int iter = 0; iter < iters; iter++) { int min = randomIntBetween(0, numWrites - 1); - int max = randomIntBetween(min, numWrites - 1); + long max = randomIntBetween(min, numWrites - 1); final Translog.Operation[] operations = ShardChangesAction.getOperationsBetween(indexShard, min, max, Long.MAX_VALUE); final List seenSeqNos = Arrays.stream(operations).map(Translog.Operation::seqNo).collect(Collectors.toList()); final List expectedSeqNos = LongStream.range(min, max + 1).boxed().collect(Collectors.toList()); @@ -57,13 +57,13 @@ public void testGetOperationsBetween() throws Exception { // get operations for a range no operations exists: Exception e = expectThrows(IllegalStateException.class, - () -> ShardChangesAction.getOperationsBetween(indexShard, numWrites, numWrites + 1, Long.MAX_VALUE)); + () -> ShardChangesAction.getOperationsBetween(indexShard, numWrites, (long) numWrites + 1, Long.MAX_VALUE)); assertThat(e.getMessage(), containsString("Not all operations between min_seqno [" + numWrites + "] and max_seqno [" + (numWrites + 1) +"] found")); // get operations for a range some operations do not exist: e = expectThrows(IllegalStateException.class, - () -> ShardChangesAction.getOperationsBetween(indexShard, numWrites - 10, numWrites + 10, Long.MAX_VALUE)); + () -> ShardChangesAction.getOperationsBetween(indexShard, numWrites - 10, (long) numWrites + 10, Long.MAX_VALUE)); assertThat(e.getMessage(), containsString("Not all operations between min_seqno [" + (numWrites - 10) + "] and max_seqno [" + (numWrites + 10) +"] found")); } @@ -81,7 +81,7 @@ public void testGetOperationsBetweenWhenShardNotStarted() throws Exception { ShardRouting shardRouting = TestShardRouting.newShardRouting("index", 0, "_node_id", true, ShardRoutingState.INITIALIZING); Mockito.when(indexShard.routingEntry()).thenReturn(shardRouting); - expectThrows(IndexShardNotStartedException.class, () -> ShardChangesAction.getOperationsBetween(indexShard, 0, 1, Long.MAX_VALUE)); + expectThrows(IndexShardNotStartedException.class, () -> ShardChangesAction.getOperationsBetween(indexShard, 0, 1L, Long.MAX_VALUE)); } public void testGetOperationsBetweenExceedByteLimit() throws Exception { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesRequestTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesRequestTests.java index 3f30545576046..dfb5b11b94801 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesRequestTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesRequestTests.java @@ -32,9 +32,10 @@ public void testValidate() { assertThat(request.validate().getMessage(), containsString("minSeqNo [-1] cannot be lower than 0")); request.setMinSeqNo(4); + request.setMaxSeqNo(0L); assertThat(request.validate().getMessage(), containsString("minSeqNo [4] cannot be larger than maxSeqNo [0]")); - request.setMaxSeqNo(8); + request.setMaxSeqNo(8L); assertThat(request.validate(), nullValue()); } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java index 4902917ab53e3..2b992c8bc6926 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java @@ -13,12 +13,13 @@ public class ShardChangesResponseTests extends AbstractStreamableTestCase failureHolder = new AtomicReference<>(); + + public void testDefaults() throws Exception { + task = createShardFollowTask(ShardFollowNodeTask.DEFAULT_MAX_READ_SIZE, ShardFollowNodeTask.DEFAULT_MAX_CONCURRENT_READS, + ShardFollowNodeTask.DEFAULT_MAX_WRITE_SIZE, ShardFollowNodeTask.DEFAULT_MAX_CONCURRENT_WRITES, 10000, + ShardFollowNodeTask.DEFAULT_MAX_BUFFER_SIZE); + task.start(randomIntBetween(-1, 2048), -1); + + assertBusy(() -> { + assertThat(task.getStatus().getProcessedGlobalCheckpoint(), equalTo(10000L)); + }); + assertThat(mappingUpdateCounter.get(), equalTo(1)); + } + + public void testHitBufferLimit() throws Exception { + // Setting buffer limit to 100, so that we are sure the limit will be met + task = createShardFollowTask(ShardFollowNodeTask.DEFAULT_MAX_READ_SIZE, 3, + ShardFollowNodeTask.DEFAULT_MAX_WRITE_SIZE, 1, 10000, 100); + task.start(-1, -1); + + assertBusy(() -> { + assertThat(task.getStatus().getProcessedGlobalCheckpoint(), equalTo(10000L)); + }); + } + + public void testConcurrentReadsAndWrites() throws Exception { + task = createShardFollowTask(randomIntBetween(32, 2048), randomIntBetween(2, 10), randomIntBetween(32, 2048), + randomIntBetween(2, 10), 50000, 10240); + task.start(randomIntBetween(-1, 2048), -1); + + assertBusy(() -> { + assertThat(task.getStatus().getProcessedGlobalCheckpoint(), equalTo(50000L)); + }); + } + + public void testMappingUpdate() throws Exception { + task = createShardFollowTask(1024, 1, 1024, 1, 10000, 1024); + task.start(-1, -1); + + assertBusy(() -> { + assertThat(task.getStatus().getProcessedGlobalCheckpoint(), greaterThanOrEqualTo(1000L)); + }); + imdVersion.set(2L); + assertBusy(() -> { + assertThat(task.getStatus().getProcessedGlobalCheckpoint(), equalTo(10000L)); + }); + assertThat(mappingUpdateCounter.get(), equalTo(2)); + } + + public void testOccasionalApiFailure() throws Exception { + task = createShardFollowTask(1024, 1, 1024, 1, 10000, 1024); + task.start(-1, -1); + randomlyFailWithRetryableError.set(true); + assertBusy(() -> { + assertThat(task.getStatus().getProcessedGlobalCheckpoint(), equalTo(10000L)); + }); + assertThat(failedRequests.get(), greaterThan(0)); + } + + public void testNotAllExpectedOpsReturned() throws Exception { + task = createShardFollowTask(1024, 1, 1024, 1, 10000, 1024); + task.start(-1, -1); + randomlyTruncateRequests.set(true); + assertBusy(() -> { + assertThat(task.getStatus().getProcessedGlobalCheckpoint(), equalTo(10000L)); + }); + assertThat(truncatedRequests.get(), greaterThan(0)); + } + + ShardFollowNodeTask createShardFollowTask(int maxReadSize, int maxConcurrentReads, int maxWriteSize, + int maxConcurrentWrites, int leaderGlobalCheckpoint, int bufferLimit) { + imdVersion = new AtomicLong(1L); + mappingUpdateCounter = new AtomicInteger(0); + randomlyTruncateRequests = new AtomicBoolean(false); + truncatedRequests = new AtomicInteger(); + randomlyFailWithRetryableError = new AtomicBoolean(false); + failedRequests = new AtomicInteger(0); + AtomicBoolean stopped = new AtomicBoolean(false); + ShardFollowTask params = new ShardFollowTask(null, new ShardId("follow_index", "", 0), + new ShardId("leader_index", "", 0), maxReadSize, maxConcurrentReads, ShardFollowNodeTask.DEFAULT_MAX_TRANSLOG_BYTES, + maxWriteSize, maxConcurrentWrites, bufferLimit, Collections.emptyMap()); + + BiConsumer scheduler = (delay, task) -> { + try { + Thread.sleep(delay.millis()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + Thread thread = new Thread(task); + thread.start(); + }; + AtomicInteger readCounter = new AtomicInteger(); + AtomicInteger writeCounter = new AtomicInteger(); + LocalCheckpointTracker tracker = new LocalCheckpointTracker(SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.NO_OPS_PERFORMED); + return new ShardFollowNodeTask(1L, "type", ShardFollowTask.NAME, "description", null, + Collections.emptyMap(), null, null, params, scheduler) { + + @Override + protected void updateMapping(LongConsumer handler) { + mappingUpdateCounter.incrementAndGet(); + handler.accept(imdVersion.get()); + } + + @Override + protected void innerSendBulkShardOperationsRequest(Translog.Operation[] operations, LongConsumer handler, + Consumer errorHandler) { + if (randomlyFailWithRetryableError.get() && readCounter.incrementAndGet() % 5 == 0) { + failedRequests.incrementAndGet(); + errorHandler.accept(new UnavailableShardsException(params.getFollowShardId(), "test error")); + return; + } + + for(Translog.Operation op : operations) { + tracker.markSeqNoAsCompleted(op.seqNo()); + } + + // Emulate network thread and avoid SO: + Thread thread = new Thread(() -> handler.accept(tracker.getCheckpoint())); + thread.start(); + } + + @Override + protected void innerSendShardChangesRequest(long from, Long to, Consumer handler, + Consumer errorHandler) { + if (randomlyFailWithRetryableError.get() && writeCounter.incrementAndGet() % 5 == 0) { + failedRequests.incrementAndGet(); + errorHandler.accept(new UnavailableShardsException(params.getFollowShardId(), "test error")); + return; + } + + if (from < 0) { + errorHandler.accept(new IllegalArgumentException()); + return; + } + + ShardChangesAction.Response response; + if (from >= leaderGlobalCheckpoint) { + response = new ShardChangesAction.Response(1L, leaderGlobalCheckpoint, new Translog.Operation[0]); + } else { + int size = to == null ? 100 : (int) (to - from + 1); + if (randomlyTruncateRequests.get() && size > 10 && truncatedRequests.get() < 5) { + truncatedRequests.incrementAndGet(); + size = size / 2; + } + Translog.Operation[] ops = new Translog.Operation[size]; + for (int i = 0; i < ops.length; i++) { + ops[i] = new Translog.Index("doc", UUIDs.randomBase64UUID(), from + i, 0, "{}".getBytes(StandardCharsets.UTF_8)); + } + response = new ShardChangesAction.Response(imdVersion.get(), leaderGlobalCheckpoint, ops); + } + // Emulate network thread and avoid SO: + Thread thread = new Thread(() -> handler.accept(response)); + thread.start(); + } + + @Override + protected boolean isStopped() { + return stopped.get(); + } + + @Override + public void markAsCompleted() { + stopped.set(true); + } + + @Override + public void markAsFailed(Exception e) { + stopped.set(true); + failureHolder.set(e); + } + }; + } + + @After + public void cancelNodeTask() throws Exception { + if (task != null){ + task.markAsCompleted(); + assertThat(failureHolder.get(), nullValue()); + assertBusy(() -> { + ShardFollowNodeTask.Status status = task.getStatus(); + assertThat(status.getNumberOfConcurrentReads(), equalTo(0)); + assertThat(status.getNumberOfConcurrentWrites(), equalTo(0)); + }); + } + } + +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java index b128e88e63a5a..5f6370318f7f8 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java @@ -23,11 +23,16 @@ protected ShardFollowTask doParseInstance(XContentParser parser) throws IOExcept @Override protected ShardFollowTask createTestInstance() { return new ShardFollowTask( - randomAlphaOfLength(4), - new ShardId(randomAlphaOfLength(4), randomAlphaOfLength(4), randomInt(5)), - new ShardId(randomAlphaOfLength(4), randomAlphaOfLength(4), randomInt(5)), - randomIntBetween(1, Integer.MAX_VALUE), randomIntBetween(1, Integer.MAX_VALUE), - randomIntBetween(1, Integer.MAX_VALUE), randomBoolean() ? null : Collections.singletonMap("key", "value")); + randomAlphaOfLength(4), + new ShardId(randomAlphaOfLength(4), randomAlphaOfLength(4), randomInt(5)), + new ShardId(randomAlphaOfLength(4), randomAlphaOfLength(4), randomInt(5)), + randomIntBetween(1, Integer.MAX_VALUE), + randomIntBetween(1, Integer.MAX_VALUE), + randomNonNegativeLong(), + randomIntBetween(1, Integer.MAX_VALUE), + randomIntBetween(1, Integer.MAX_VALUE), + randomIntBetween(1, Integer.MAX_VALUE), + randomBoolean() ? null : Collections.singletonMap("key", "value")); } @Override