Skip to content

Commit

Permalink
Rewrite shard follow node task logic
Browse files Browse the repository at this point in the history
The current shard follow mechanism is complex and does not give us easy ways the have visibility into the system (e.g. why we are falling behind).
The main reason why it is complex is because the current design is highly asynchronous. Also in the current model it is hard to apply backpressure
other than reducing the concurrent reads from the leader shard.

This PR has the following changes:
* Rewrote the shard follow task to coordinate the shard follow mechanism between a leader and follow shard in a single threaded manner.
  This allows for better unit testing and makes it easier to add stats.
* All write operations read from the shard changes api should be added to a buffer instead of directly sending it to the bulk shard operations api.
  This allows to apply backpressure. In this PR there is a limit that controls how many write ops are allowed in the buffer after which no new reads
  will be performed until the number of ops is below that limit.
* The shard changes api includes the current global checkpoint on the leader shard copy. This allows reading to be a more self sufficient process;
  instead of relying on a background thread to fetch the leader shard's global checkpoint.
* Reading write operations from the leader shard (via shard changes api) is a seperate step then writing the write operations (via bulk shards operations api).
  Whereas before a read would immediately result into a write.
* The bulk shard operations api returns the local checkpoint on the follow primary shard, to keep the shard follow task up to date with what has been written.
* Moved the shard follow logic that was previously in ShardFollowTasksExecutor to ShardFollowNodeTask.
* Moved over the changes from #31242 to make shard follow mechanism resilient from node and shard failures.

Relates to #30086
  • Loading branch information
martijnvg committed Jun 27, 2018
1 parent a55f614 commit 516dcb7
Show file tree
Hide file tree
Showing 17 changed files with 1,108 additions and 961 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,12 @@ public static class Request extends ActionRequest {

private String leaderIndex;
private String followIndex;
private long batchSize = ShardFollowTasksExecutor.DEFAULT_BATCH_SIZE;
private int concurrentProcessors = ShardFollowTasksExecutor.DEFAULT_CONCURRENT_PROCESSORS;
private long processorMaxTranslogBytes = ShardFollowTasksExecutor.DEFAULT_MAX_TRANSLOG_BYTES;
private int maxReadSize = ShardFollowNodeTask.DEFAULT_MAX_READ_SIZE;
private int maxConcurrentReads = ShardFollowNodeTask.DEFAULT_MAX_CONCURRENT_READS;
private long processorMaxTranslogBytes = ShardFollowNodeTask.DEFAULT_MAX_TRANSLOG_BYTES;
private int maxWriteSize = ShardFollowNodeTask.DEFAULT_MAX_WRITE_SIZE;
private int maxConcurrentWrites = ShardFollowNodeTask.DEFAULT_MAX_CONCURRENT_WRITES;
private int maxBufferSize = ShardFollowNodeTask.DEFAULT_MAX_BUFFER_SIZE;

public String getLeaderIndex() {
return leaderIndex;
Expand All @@ -91,23 +94,23 @@ public void setFollowIndex(String followIndex) {
this.followIndex = followIndex;
}

public long getBatchSize() {
return batchSize;
public int getMaxReadSize() {
return maxReadSize;
}

public void setBatchSize(long batchSize) {
if (batchSize < 1) {
throw new IllegalArgumentException("Illegal batch_size [" + batchSize + "]");
public void setMaxReadSize(int maxReadSize) {
if (maxReadSize < 1) {
throw new IllegalArgumentException("Illegal batch_size [" + maxReadSize + "]");
}

this.batchSize = batchSize;
this.maxReadSize = maxReadSize;
}

public void setConcurrentProcessors(int concurrentProcessors) {
if (concurrentProcessors < 1) {
public void setMaxConcurrentReads(int maxConcurrentReads) {
if (maxConcurrentReads < 1) {
throw new IllegalArgumentException("concurrent_processors must be larger than 0");
}
this.concurrentProcessors = concurrentProcessors;
this.maxConcurrentReads = maxConcurrentReads;
}

public void setProcessorMaxTranslogBytes(long processorMaxTranslogBytes) {
Expand All @@ -117,6 +120,39 @@ public void setProcessorMaxTranslogBytes(long processorMaxTranslogBytes) {
this.processorMaxTranslogBytes = processorMaxTranslogBytes;
}

public int getMaxWriteSize() {
return maxWriteSize;
}

public void setMaxWriteSize(int maxWriteSize) {
if (maxWriteSize < 1) {
throw new IllegalArgumentException("maxWriteSize must be larger than 0");
}
this.maxWriteSize = maxWriteSize;
}

public int getMaxConcurrentWrites() {
return maxConcurrentWrites;
}

public void setMaxConcurrentWrites(int maxConcurrentWrites) {
if (maxConcurrentWrites < 1) {
throw new IllegalArgumentException("maxConcurrentWrites must be larger than 0");
}
this.maxConcurrentWrites = maxConcurrentWrites;
}

public int getMaxBufferSize() {
return maxBufferSize;
}

public void setMaxBufferSize(int maxBufferSize) {
if (maxBufferSize < 1) {
throw new IllegalArgumentException("maxBufferSize must be larger than 0");
}
this.maxBufferSize = maxBufferSize;
}

@Override
public ActionRequestValidationException validate() {
return null;
Expand All @@ -127,36 +163,46 @@ public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
leaderIndex = in.readString();
followIndex = in.readString();
batchSize = in.readVLong();
concurrentProcessors = in.readVInt();
maxReadSize = in.readVInt();
maxConcurrentReads = in.readVInt();
processorMaxTranslogBytes = in.readVLong();
maxWriteSize = in.readVInt();
maxConcurrentWrites = in.readVInt();
maxBufferSize = in.readVInt();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(leaderIndex);
out.writeString(followIndex);
out.writeVLong(batchSize);
out.writeVInt(concurrentProcessors);
out.writeVInt(maxReadSize);
out.writeVInt(maxConcurrentReads);
out.writeVLong(processorMaxTranslogBytes);
out.writeVInt(maxWriteSize);
out.writeVInt(maxConcurrentWrites);
out.writeVInt(maxBufferSize);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return batchSize == request.batchSize &&
concurrentProcessors == request.concurrentProcessors &&
return maxReadSize == request.maxReadSize &&
maxConcurrentReads == request.maxConcurrentReads &&
processorMaxTranslogBytes == request.processorMaxTranslogBytes &&
maxWriteSize == request.maxWriteSize &&
maxConcurrentWrites == request.maxConcurrentWrites &&
maxBufferSize == request.maxBufferSize &&
Objects.equals(leaderIndex, request.leaderIndex) &&
Objects.equals(followIndex, request.followIndex);
}

@Override
public int hashCode() {
return Objects.hash(leaderIndex, followIndex, batchSize, concurrentProcessors, processorMaxTranslogBytes);
return Objects.hash(leaderIndex, followIndex, maxReadSize, maxConcurrentReads, processorMaxTranslogBytes,
maxWriteSize, maxConcurrentWrites, maxBufferSize);
}
}

Expand Down Expand Up @@ -254,7 +300,8 @@ void start(Request request, String clusterNameAlias, IndexMetaData leaderIndexMe
ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias,
new ShardId(followIndexMetadata.getIndex(), shardId),
new ShardId(leaderIndexMetadata.getIndex(), shardId),
request.batchSize, request.concurrentProcessors, request.processorMaxTranslogBytes, filteredHeaders);
request.maxReadSize, request.maxConcurrentReads, request.processorMaxTranslogBytes,
request.maxWriteSize, request.maxConcurrentWrites, request.maxBufferSize, filteredHeaders);
persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask,
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ public Response newResponse() {
public static class Request extends SingleShardRequest<Request> {

private long minSeqNo;
private long maxSeqNo;
private Long maxSeqNo;
private ShardId shardId;
private long maxTranslogsBytes = ShardFollowTasksExecutor.DEFAULT_MAX_TRANSLOG_BYTES;
private long maxTranslogsBytes = ShardFollowNodeTask.DEFAULT_MAX_TRANSLOG_BYTES;

public Request(ShardId shardId) {
super(shardId.getIndexName());
Expand All @@ -78,11 +78,11 @@ public void setMinSeqNo(long minSeqNo) {
this.minSeqNo = minSeqNo;
}

public long getMaxSeqNo() {
public Long getMaxSeqNo() {
return maxSeqNo;
}

public void setMaxSeqNo(long maxSeqNo) {
public void setMaxSeqNo(Long maxSeqNo) {
this.maxSeqNo = maxSeqNo;
}

Expand All @@ -100,7 +100,7 @@ public ActionRequestValidationException validate() {
if (minSeqNo < 0) {
validationException = addValidationError("minSeqNo [" + minSeqNo + "] cannot be lower than 0", validationException);
}
if (maxSeqNo < minSeqNo) {
if (maxSeqNo != null && maxSeqNo < minSeqNo) {
validationException = addValidationError("minSeqNo [" + minSeqNo + "] cannot be larger than maxSeqNo ["
+ maxSeqNo + "]", validationException);
}
Expand All @@ -115,7 +115,7 @@ public ActionRequestValidationException validate() {
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
minSeqNo = in.readVLong();
maxSeqNo = in.readVLong();
maxSeqNo = in.readOptionalLong();
shardId = ShardId.readShardId(in);
maxTranslogsBytes = in.readVLong();
}
Expand All @@ -124,7 +124,7 @@ public void readFrom(StreamInput in) throws IOException {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVLong(minSeqNo);
out.writeVLong(maxSeqNo);
out.writeOptionalLong(maxSeqNo);
shardId.writeTo(out);
out.writeVLong(maxTranslogsBytes);
}
Expand All @@ -136,7 +136,7 @@ public boolean equals(final Object o) {
if (o == null || getClass() != o.getClass()) return false;
final Request request = (Request) o;
return minSeqNo == request.minSeqNo &&
maxSeqNo == request.maxSeqNo &&
Objects.equals(maxSeqNo, request.maxSeqNo) &&
Objects.equals(shardId, request.shardId) &&
maxTranslogsBytes == request.maxTranslogsBytes;
}
Expand All @@ -150,20 +150,26 @@ public int hashCode() {
public static final class Response extends ActionResponse {

private long indexMetadataVersion;
private long leaderGlobalCheckpoint;
private Translog.Operation[] operations;

Response() {
}

Response(long indexMetadataVersion, final Translog.Operation[] operations) {
Response(long indexMetadataVersion, long leaderGlobalCheckpoint, final Translog.Operation[] operations) {
this.indexMetadataVersion = indexMetadataVersion;
this.leaderGlobalCheckpoint = leaderGlobalCheckpoint;
this.operations = operations;
}

public long getIndexMetadataVersion() {
return indexMetadataVersion;
}

public long getLeaderGlobalCheckpoint() {
return leaderGlobalCheckpoint;
}

public Translog.Operation[] getOperations() {
return operations;
}
Expand All @@ -172,13 +178,15 @@ public Translog.Operation[] getOperations() {
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
indexMetadataVersion = in.readVLong();
leaderGlobalCheckpoint = in.readZLong();
operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new);
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVLong(indexMetadataVersion);
out.writeZLong(leaderGlobalCheckpoint);
out.writeArray(Translog.Operation::writeOperation, operations);
}

Expand All @@ -188,13 +196,15 @@ public boolean equals(final Object o) {
if (o == null || getClass() != o.getClass()) return false;
final Response response = (Response) o;
return indexMetadataVersion == response.indexMetadataVersion &&
Arrays.equals(operations, response.operations);
leaderGlobalCheckpoint == response.leaderGlobalCheckpoint &&
Arrays.equals(operations, response.operations);
}

@Override
public int hashCode() {
int result = 1;
result += Objects.hashCode(indexMetadataVersion);
result += Objects.hashCode(leaderGlobalCheckpoint);
result += Arrays.hashCode(operations);
return result;
}
Expand Down Expand Up @@ -222,14 +232,16 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc
IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
IndexShard indexShard = indexService.getShard(request.getShard().id());
final long indexMetaDataVersion = clusterService.state().metaData().index(shardId.getIndex()).getVersion();

// The following shard generates this request based on the global checkpoint on the primary copy on the leader.
// Although this value might not have been synced to all replica copies on the leader, the requesting range
// is guaranteed to be at most the local-checkpoint of any shard copies on the leader.
assert request.maxSeqNo <= indexShard.getLocalCheckpoint() : "invalid request from_seqno=[" + request.minSeqNo + "]," +
" to_seqno=[" + request.maxSeqNo + "], local_checkpoint=[" + indexShard.getLocalCheckpoint() + "]";
assert request.maxSeqNo == null || request.maxSeqNo <= indexShard.getLocalCheckpoint() : "invalid request from_seqno=[" +
request.minSeqNo + "]," + " to_seqno=[" + request.maxSeqNo + "], local_checkpoint=[" +
indexShard.getLocalCheckpoint() + "]";
final Translog.Operation[] operations =
getOperationsBetween(indexShard, request.minSeqNo, request.maxSeqNo, request.maxTranslogsBytes);
return new Response(indexMetaDataVersion, operations);
return new Response(indexMetaDataVersion, indexShard.getGlobalCheckpoint(), operations);
}

@Override
Expand All @@ -254,14 +266,15 @@ protected Response newResponse() {

private static final Translog.Operation[] EMPTY_OPERATIONS_ARRAY = new Translog.Operation[0];

static Translog.Operation[] getOperationsBetween(IndexShard indexShard, long minSeqNo, long maxSeqNo,
static Translog.Operation[] getOperationsBetween(IndexShard indexShard, long minSeqNo, Long maxSeqNo,
long byteLimit) throws IOException {
if (indexShard.state() != IndexShardState.STARTED) {
throw new IndexShardNotStartedException(indexShard.shardId(), indexShard.state());
}
int seenBytes = 0;
final List<Translog.Operation> operations = new ArrayList<>();
try (Translog.Snapshot snapshot = indexShard.newLuceneChangesSnapshot("ccr", minSeqNo, maxSeqNo, true)) {
long max = maxSeqNo != null ? maxSeqNo : minSeqNo + 1000;
try (Translog.Snapshot snapshot = indexShard.newLuceneChangesSnapshot("ccr", minSeqNo, max, true)) {
Translog.Operation op;
while ((op = snapshot.next()) != null) {
if (op.getSource() == null) {
Expand All @@ -274,6 +287,15 @@ static Translog.Operation[] getOperationsBetween(IndexShard indexShard, long min
break;
}
}
} catch (IllegalStateException e) {
// TODO: handle peek reads better.
// Should this optional upper bound leak into the newLuceneChangesSnapshot(...) method?
if (maxSeqNo != null) {
throw e;
} else if (e.getMessage().contains("prematurely terminated last_seen_seqno") == false) {
// Only fail if there are gaps between the ops.
throw e;
}
}
return operations.toArray(EMPTY_OPERATIONS_ARRAY);
}
Expand Down
Loading

0 comments on commit 516dcb7

Please sign in to comment.