-
Notifications
You must be signed in to change notification settings - Fork 24.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rewrite shard follow node task logic #31581
Merged
martijnvg
merged 46 commits into
elastic:ccr
from
martijnvg:ccr_follow_shard_task_rewrite_2
Jul 10, 2018
Merged
Changes from 34 commits
Commits
Show all changes
46 commits
Select commit
Hold shift + click to select a range
516dcb7
Rewrite shard follow node task logic
martijnvg 9dec920
Made to param required like before and changed validation in shard ch…
martijnvg 7ae1f1d
changed log levels
martijnvg 5510731
renamed maxTranslogBytes to maxOperationSizeInBytes
martijnvg f3d58e0
changed shard changes api to be size based instead of range based
martijnvg 0330e5f
start the node task only with followGlobalCheckpoint
martijnvg adc829f
iter
martijnvg 0b8e6a7
coordinate reads should not schedule, but if there is budget at least…
martijnvg 7b3fb30
s/leaderGlobalCheckpoint/globalCheckpoint
martijnvg a0059ae
Separate the code that uses client and makes remote calls from ShardF…
martijnvg a356fbe
iter, exception handling.
martijnvg fa6bb6f
iter
martijnvg e73d990
adjusted test
martijnvg 942c777
improve coordinateReads()
martijnvg 9cdf320
improved assert
martijnvg b5c6187
Merge remote-tracking branch 'es/ccr' into ccr_follow_shard_task_rewr…
martijnvg 5cd19ad
Merge remote-tracking branch 'elastic/ccr' into pr/31581
jasontedor e9d61c1
iter
martijnvg 9b8de5f
fixed compile error and checkstyle violation
martijnvg 60fb9a8
increase timeouts
martijnvg 75b07aa
Merge remote-tracking branch 'es/ccr' into ccr_follow_shard_task_rewr…
martijnvg 2ec6ac4
rename + jdocs
martijnvg fa1b52f
iter
martijnvg 932eeaa
keep strict validation
martijnvg aa11dda
iter2
martijnvg b100b97
implemented custom WritePrimaryResult to include global checkpoint in…
martijnvg 9048913
made retry timeout configurable
martijnvg fb8d47e
added to test whether we always return at least 1 op when
martijnvg b0e3d49
changed retry delay in tests to 10ms
martijnvg b6f0320
made idelShardRetryDelay a parameter in the follow api
martijnvg d17a4b3
added comment
martijnvg 877d566
Merge remote-tracking branch 'es/ccr' into ccr_follow_shard_task_rewr…
martijnvg 53fe1c2
made parameter names consistent and collapsed maxReadSize and maxWrit…
martijnvg 813498a
also apply maxBatchSizeInBytes on write side
martijnvg cb03c9a
iter
martijnvg 7682b8c
iter2
martijnvg 80c895d
easier to understand CcrWritePrimaryResult logic
martijnvg e6c5422
added some more stats
martijnvg 3eb0056
Merge remote-tracking branch 'es/ccr' into ccr_follow_shard_task_rewr…
martijnvg efee1af
changed ops in bulk shard ops request from array to list
martijnvg 2367e06
one set of {} is good enough
martijnvg d9086fe
only peek read when there is budget
martijnvg bf251af
variable rename
martijnvg 775de27
call coordinateReads() from handleWriteResponse()
martijnvg cf63047
renamed fields and variables
martijnvg cb09fd7
the if check is not needed, calling coordinateReads() and coordinateW…
martijnvg File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,7 @@ | |
import org.elasticsearch.common.io.stream.StreamOutput; | ||
import org.elasticsearch.common.settings.Setting; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.unit.TimeValue; | ||
import org.elasticsearch.index.IndexSettings; | ||
import org.elasticsearch.index.IndexingSlowLog; | ||
import org.elasticsearch.index.SearchSlowLog; | ||
|
@@ -71,50 +72,57 @@ public static class Request extends ActionRequest { | |
|
||
private String leaderIndex; | ||
private String followIndex; | ||
private long batchSize = ShardFollowTasksExecutor.DEFAULT_BATCH_SIZE; | ||
private int concurrentProcessors = ShardFollowTasksExecutor.DEFAULT_CONCURRENT_PROCESSORS; | ||
private long processorMaxTranslogBytes = ShardFollowTasksExecutor.DEFAULT_MAX_TRANSLOG_BYTES; | ||
|
||
public String getLeaderIndex() { | ||
return leaderIndex; | ||
} | ||
|
||
public void setLeaderIndex(String leaderIndex) { | ||
this.leaderIndex = leaderIndex; | ||
} | ||
|
||
public String getFollowIndex() { | ||
return followIndex; | ||
} | ||
private int maxBatchOperationCount; | ||
private int maxConcurrentReadBatches; | ||
private long maxOperationSizeInBytes; | ||
private int maxConcurrentWriteBatches; | ||
private int maxWriteBufferSize; | ||
private TimeValue retryTimeout; | ||
private TimeValue idleShardRetryDelay; | ||
|
||
public Request(String leaderIndex, String followIndex, int maxBatchOperationCount, int maxConcurrentReadBatches, | ||
long maxOperationSizeInBytes, int maxConcurrentWriteBatches, int maxWriteBufferSize, | ||
TimeValue retryTimeout, TimeValue idleShardRetryDelay) { | ||
if (maxBatchOperationCount < 1) { | ||
throw new IllegalArgumentException("maxBatchOperationCount must be larger than 0"); | ||
} | ||
if (maxConcurrentReadBatches < 1) { | ||
throw new IllegalArgumentException("concurrent_processors must be larger than 0"); | ||
} | ||
if (maxOperationSizeInBytes <= 0) { | ||
throw new IllegalArgumentException("processor_max_translog_bytes must be larger than 0"); | ||
} | ||
if (maxConcurrentWriteBatches < 1) { | ||
throw new IllegalArgumentException("maxConcurrentWriteBatches must be larger than 0"); | ||
} | ||
if (maxWriteBufferSize < 1) { | ||
throw new IllegalArgumentException("maxWriteBufferSize must be larger than 0"); | ||
} | ||
|
||
public void setFollowIndex(String followIndex) { | ||
this.followIndex = followIndex; | ||
this.leaderIndex = Objects.requireNonNull(leaderIndex); | ||
this.followIndex = Objects.requireNonNull(followIndex); | ||
this.maxBatchOperationCount = maxBatchOperationCount; | ||
this.maxConcurrentReadBatches = maxConcurrentReadBatches; | ||
this.maxOperationSizeInBytes = maxOperationSizeInBytes; | ||
this.maxConcurrentWriteBatches = maxConcurrentWriteBatches; | ||
this.maxWriteBufferSize = maxWriteBufferSize; | ||
this.retryTimeout = retryTimeout; | ||
this.idleShardRetryDelay = idleShardRetryDelay; | ||
} | ||
|
||
public long getBatchSize() { | ||
return batchSize; | ||
Request() { | ||
} | ||
|
||
public void setBatchSize(long batchSize) { | ||
if (batchSize < 1) { | ||
throw new IllegalArgumentException("Illegal batch_size [" + batchSize + "]"); | ||
} | ||
|
||
this.batchSize = batchSize; | ||
public String getLeaderIndex() { | ||
return leaderIndex; | ||
} | ||
|
||
public void setConcurrentProcessors(int concurrentProcessors) { | ||
if (concurrentProcessors < 1) { | ||
throw new IllegalArgumentException("concurrent_processors must be larger than 0"); | ||
} | ||
this.concurrentProcessors = concurrentProcessors; | ||
public String getFollowIndex() { | ||
return followIndex; | ||
} | ||
|
||
public void setProcessorMaxTranslogBytes(long processorMaxTranslogBytes) { | ||
if (processorMaxTranslogBytes <= 0) { | ||
throw new IllegalArgumentException("processor_max_translog_bytes must be larger than 0"); | ||
} | ||
this.processorMaxTranslogBytes = processorMaxTranslogBytes; | ||
public int getMaxBatchOperationCount() { | ||
return maxBatchOperationCount; | ||
} | ||
|
||
@Override | ||
|
@@ -127,36 +135,49 @@ public void readFrom(StreamInput in) throws IOException { | |
super.readFrom(in); | ||
leaderIndex = in.readString(); | ||
followIndex = in.readString(); | ||
batchSize = in.readVLong(); | ||
concurrentProcessors = in.readVInt(); | ||
processorMaxTranslogBytes = in.readVLong(); | ||
maxBatchOperationCount = in.readVInt(); | ||
maxConcurrentReadBatches = in.readVInt(); | ||
maxOperationSizeInBytes = in.readVLong(); | ||
maxConcurrentWriteBatches = in.readVInt(); | ||
maxWriteBufferSize = in.readVInt(); | ||
retryTimeout = in.readOptionalTimeValue(); | ||
idleShardRetryDelay = in.readOptionalTimeValue(); | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
super.writeTo(out); | ||
out.writeString(leaderIndex); | ||
out.writeString(followIndex); | ||
out.writeVLong(batchSize); | ||
out.writeVInt(concurrentProcessors); | ||
out.writeVLong(processorMaxTranslogBytes); | ||
out.writeVInt(maxBatchOperationCount); | ||
out.writeVInt(maxConcurrentReadBatches); | ||
out.writeVLong(maxOperationSizeInBytes); | ||
out.writeVInt(maxConcurrentWriteBatches); | ||
out.writeVInt(maxWriteBufferSize); | ||
out.writeOptionalTimeValue(retryTimeout); | ||
out.writeOptionalTimeValue(idleShardRetryDelay); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (this == o) return true; | ||
if (o == null || getClass() != o.getClass()) return false; | ||
Request request = (Request) o; | ||
return batchSize == request.batchSize && | ||
concurrentProcessors == request.concurrentProcessors && | ||
processorMaxTranslogBytes == request.processorMaxTranslogBytes && | ||
return maxBatchOperationCount == request.maxBatchOperationCount && | ||
maxConcurrentReadBatches == request.maxConcurrentReadBatches && | ||
maxOperationSizeInBytes == request.maxOperationSizeInBytes && | ||
maxConcurrentWriteBatches == request.maxConcurrentWriteBatches && | ||
maxWriteBufferSize == request.maxWriteBufferSize && | ||
Objects.equals(retryTimeout, request.retryTimeout) && | ||
Objects.equals(idleShardRetryDelay, request.idleShardRetryDelay) && | ||
Objects.equals(leaderIndex, request.leaderIndex) && | ||
Objects.equals(followIndex, request.followIndex); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(leaderIndex, followIndex, batchSize, concurrentProcessors, processorMaxTranslogBytes); | ||
return Objects.hash(leaderIndex, followIndex, maxBatchOperationCount, maxConcurrentReadBatches, maxOperationSizeInBytes, | ||
maxConcurrentWriteBatches, maxWriteBufferSize, retryTimeout, idleShardRetryDelay); | ||
} | ||
} | ||
|
||
|
@@ -197,7 +218,7 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li | |
ClusterState localClusterState = clusterService.state(); | ||
IndexMetaData followIndexMetadata = localClusterState.getMetaData().index(request.followIndex); | ||
|
||
String[] indices = new String[]{request.getLeaderIndex()}; | ||
String[] indices = new String[]{request.leaderIndex}; | ||
Map<String, List<String>> remoteClusterIndices = remoteClusterService.groupClusterIndices(indices, s -> false); | ||
if (remoteClusterIndices.containsKey(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { | ||
// Following an index in local cluster, so use local cluster state to fetch leader IndexMetaData: | ||
|
@@ -251,10 +272,21 @@ void start(Request request, String clusterNameAlias, IndexMetaData leaderIndexMe | |
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));for (int i = 0; i < numShards; i++) { | ||
final int shardId = i; | ||
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId; | ||
TimeValue retryTimeout = ShardFollowNodeTask.DEFAULT_RETRY_TIMEOUT; | ||
if (request.retryTimeout != null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think these can be null? |
||
retryTimeout = request.retryTimeout; | ||
} | ||
TimeValue idleShardRetryDelay = ShardFollowNodeTask.DEFAULT_IDLE_SHARD_RETRY_DELAY; | ||
if (request.idleShardRetryDelay != null) { | ||
idleShardRetryDelay = request.idleShardRetryDelay; | ||
} | ||
|
||
ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias, | ||
new ShardId(followIndexMetadata.getIndex(), shardId), | ||
new ShardId(leaderIndexMetadata.getIndex(), shardId), | ||
request.batchSize, request.concurrentProcessors, request.processorMaxTranslogBytes, filteredHeaders); | ||
request.maxBatchOperationCount, request.maxConcurrentReadBatches, request.maxOperationSizeInBytes, | ||
request.maxConcurrentWriteBatches, request.maxWriteBufferSize, retryTimeout, idleShardRetryDelay, | ||
filteredHeaders); | ||
persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask, | ||
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() { | ||
@Override | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we also check the time values are non null?