Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite shard follow node task logic #31581

Merged
merged 46 commits into from
Jul 10, 2018
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
516dcb7
Rewrite shard follow node task logic
martijnvg Jun 22, 2018
9dec920
Made to param required like before and changed validation in shard ch…
martijnvg Jun 28, 2018
7ae1f1d
changed log levels
martijnvg Jun 28, 2018
5510731
renamed maxTranslogBytes to maxOperationSizeInBytes
martijnvg Jun 28, 2018
f3d58e0
changed shard changes api to be size based instead of range based
martijnvg Jun 28, 2018
0330e5f
start the node task only with followGlobalCheckpoint
martijnvg Jun 28, 2018
adc829f
iter
martijnvg Jun 28, 2018
0b8e6a7
coordinate reads should not schedule, but if there is budget at least…
martijnvg Jun 28, 2018
7b3fb30
s/leaderGlobalCheckpoint/globalCheckpoint
martijnvg Jun 28, 2018
a0059ae
Separate the code that uses client and makes remote calls from ShardF…
martijnvg Jun 28, 2018
a356fbe
iter, exception handling.
martijnvg Jun 28, 2018
fa6bb6f
iter
martijnvg Jun 28, 2018
e73d990
adjusted test
martijnvg Jun 28, 2018
942c777
improve coordinateReads()
martijnvg Jun 29, 2018
9cdf320
improved assert
martijnvg Jun 29, 2018
b5c6187
Merge remote-tracking branch 'es/ccr' into ccr_follow_shard_task_rewr…
martijnvg Jun 29, 2018
5cd19ad
Merge remote-tracking branch 'elastic/ccr' into pr/31581
jasontedor Jul 1, 2018
e9d61c1
iter
martijnvg Jul 2, 2018
9b8de5f
fixed compile error and checkstyle violation
martijnvg Jul 2, 2018
60fb9a8
increase timeouts
martijnvg Jul 3, 2018
75b07aa
Merge remote-tracking branch 'es/ccr' into ccr_follow_shard_task_rewr…
martijnvg Jul 5, 2018
2ec6ac4
rename + jdocs
martijnvg Jul 5, 2018
fa1b52f
iter
martijnvg Jul 5, 2018
932eeaa
keep strict validation
martijnvg Jul 5, 2018
aa11dda
iter2
martijnvg Jul 5, 2018
b100b97
implemented custom WritePrimaryResult to include global checkpoint in…
martijnvg Jul 5, 2018
9048913
made retry timeout configurable
martijnvg Jul 5, 2018
fb8d47e
added to test whether we always return at least 1 op when
martijnvg Jul 5, 2018
b0e3d49
changed retry delay in tests to 10ms
martijnvg Jul 5, 2018
b6f0320
made idelShardRetryDelay a parameter in the follow api
martijnvg Jul 5, 2018
d17a4b3
added comment
martijnvg Jul 5, 2018
877d566
Merge remote-tracking branch 'es/ccr' into ccr_follow_shard_task_rewr…
martijnvg Jul 6, 2018
53fe1c2
made parameter names consistent and collapsed maxReadSize and maxWrit…
martijnvg Jul 9, 2018
813498a
also apply maxBatchSizeInBytes on write side
martijnvg Jul 9, 2018
cb03c9a
iter
martijnvg Jul 9, 2018
7682b8c
iter2
martijnvg Jul 9, 2018
80c895d
easier to understand CcrWritePrimaryResult logic
martijnvg Jul 9, 2018
e6c5422
added some more stats
martijnvg Jul 9, 2018
3eb0056
Merge remote-tracking branch 'es/ccr' into ccr_follow_shard_task_rewr…
martijnvg Jul 9, 2018
efee1af
changed ops in bulk shard ops request from array to list
martijnvg Jul 9, 2018
2367e06
one set of {} is good enough
martijnvg Jul 9, 2018
d9086fe
only peek read when there is budget
martijnvg Jul 9, 2018
bf251af
variable rename
martijnvg Jul 10, 2018
775de27
call coordinateReads() from handleWriteResponse()
martijnvg Jul 10, 2018
cf63047
renamed fields and variables
martijnvg Jul 10, 2018
cb09fd7
the if check is not needed, calling coordinateReads() and coordinateW…
martijnvg Jul 10, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,15 @@ public static class WritePrimaryResult<ReplicaRequest extends ReplicatedWriteReq
implements RespondingWriteResult {
boolean finishedAsyncActions;
public final Location location;
public final IndexShard primary;
ActionListener<Response> listener = null;

public WritePrimaryResult(ReplicaRequest request, @Nullable Response finalResponse,
@Nullable Location location, @Nullable Exception operationFailure,
IndexShard primary, Logger logger) {
super(request, finalResponse, operationFailure);
this.location = location;
this.primary = primary;
assert location == null || operationFailure == null
: "expected either failure to be null or translog location to be null, " +
"but found: [" + location + "] translog location and [" + operationFailure + "] failure";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,14 @@ public Translog.Operation next() throws IOException {
private void rangeCheck(Translog.Operation op) {
if (op == null) {
if (lastSeenSeqNo < toSeqNo) {
throw new IllegalStateException("Not all operations between min_seqno [" + fromSeqNo + "] " +
"and max_seqno [" + toSeqNo + "] found; prematurely terminated last_seen_seqno [" + lastSeenSeqNo + "]");
throw new IllegalStateException("Not all operations between from_seqno [" + fromSeqNo + "] " +
"and to_seqno [" + toSeqNo + "] found; prematurely terminated last_seen_seqno [" + lastSeenSeqNo + "]");
}
} else {
final long expectedSeqNo = lastSeenSeqNo + 1;
if (op.seqNo() != expectedSeqNo) {
throw new IllegalStateException("Not all operations between min_seqno [" + fromSeqNo + "] " +
"and max_seqno [" + toSeqNo + "] found; expected seqno [" + expectedSeqNo + "]; found [" + op + "]");
throw new IllegalStateException("Not all operations between from_seqno [" + fromSeqNo + "] " +
"and to_seqno [" + toSeqNo + "] found; expected seqno [" + expectedSeqNo + "]; found [" + op + "]");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void testBasics() throws Exception {
try (Translog.Snapshot snapshot = engine.newLuceneChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, true)) {
IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot));
assertThat(error.getMessage(),
containsString("Not all operations between min_seqno [" + fromSeqNo + "] and max_seqno [" + toSeqNo + "] found"));
containsString("Not all operations between from_seqno [" + fromSeqNo + "] and to_seqno [" + toSeqNo + "] found"));
}
try (Translog.Snapshot snapshot = engine.newLuceneChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, false)) {
assertThat(snapshot, SnapshotMatchers.size(0));
Expand Down Expand Up @@ -108,19 +108,19 @@ searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), f
searcher = null;
IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot));
assertThat(error.getMessage(),
containsString("Not all operations between min_seqno [" + fromSeqNo + "] and max_seqno [" + toSeqNo + "] found"));
containsString("Not all operations between from_seqno [" + fromSeqNo + "] and to_seqno [" + toSeqNo + "] found"));
}finally {
IOUtils.close(searcher);
}
}else {
} else {
fromSeqNo = randomLongBetween(0, refreshedSeqNo);
toSeqNo = randomLongBetween(refreshedSeqNo + 1, numOps * 2);
Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(
searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, false)) {
searcher = null;
assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(fromSeqNo, refreshedSeqNo));
}finally {
} finally {
IOUtils.close(searcher);
}
searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
Expand All @@ -129,7 +129,7 @@ searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), f
searcher = null;
IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot));
assertThat(error.getMessage(),
containsString("Not all operations between min_seqno [" + fromSeqNo + "] and max_seqno [" + toSeqNo + "] found"));
containsString("Not all operations between from_seqno [" + fromSeqNo + "] and to_seqno [" + toSeqNo + "] found"));
}finally {
IOUtils.close(searcher);
}
Expand All @@ -139,7 +139,7 @@ searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), f
searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, true)) {
searcher = null;
assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(fromSeqNo, toSeqNo));
}finally {
} finally {
IOUtils.close(searcher);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void testFollowIndex() throws Exception {
assertThat(tasks.size(), equalTo(0));
assertThat(countCcrNodeTasks(), equalTo(0));
});

followIndex("leader_cluster:" + allowedIndex, allowedIndex);
assertThat(countCcrNodeTasks(), equalTo(1));
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_xpack/ccr/_unfollow")));
Expand All @@ -95,14 +95,14 @@ public void testFollowIndex() throws Exception {
assertThat(tasks.size(), equalTo(0));
assertThat(countCcrNodeTasks(), equalTo(0));
});

createAndFollowIndex("leader_cluster:" + unallowedIndex, unallowedIndex);
// Verify that nothing has been replicated and no node tasks are running
// These node tasks should have been failed due to the fact that the user
// has no sufficient priviledges.
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
verifyDocuments(adminClient(), unallowedIndex, 0);

followIndex("leader_cluster:" + unallowedIndex, unallowedIndex);
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
verifyDocuments(adminClient(), unallowedIndex, 0);
Expand Down Expand Up @@ -146,12 +146,14 @@ private static void refresh(String index) throws IOException {
private static void followIndex(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_xpack/ccr/_follow");
request.addParameter("leader_index", leaderIndex);
request.addParameter("idle_shard_retry_delay", "10ms");
assertOK(client().performRequest(request));
}

private static void createAndFollowIndex(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_xpack/ccr/_create_and_follow");
request.addParameter("leader_index", leaderIndex);
request.addParameter("idle_shard_retry_delay", "10ms");
assertOK(client().performRequest(request));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,14 @@ private static void refresh(String index) throws IOException {
private static void followIndex(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_xpack/ccr/_follow");
request.addParameter("leader_index", leaderIndex);
request.addParameter("idle_shard_retry_delay", "10ms");
assertOK(client().performRequest(request));
}

private static void createAndFollowIndex(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_xpack/ccr/_create_and_follow");
request.addParameter("leader_index", leaderIndex);
request.addParameter("idle_shard_retry_delay", "10ms");
assertOK(client().performRequest(request));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,15 @@ public static class Request extends AcknowledgedRequest<Request> {

private FollowIndexAction.Request followRequest;

public FollowIndexAction.Request getFollowRequest() {
return followRequest;
public Request(FollowIndexAction.Request followRequest) {
this.followRequest = Objects.requireNonNull(followRequest);
}

Request() {
}

public void setFollowRequest(FollowIndexAction.Request followRequest) {
this.followRequest = followRequest;
public FollowIndexAction.Request getFollowRequest() {
return followRequest;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexingSlowLog;
import org.elasticsearch.index.SearchSlowLog;
Expand Down Expand Up @@ -71,50 +72,57 @@ public static class Request extends ActionRequest {

private String leaderIndex;
private String followIndex;
private long batchSize = ShardFollowTasksExecutor.DEFAULT_BATCH_SIZE;
private int concurrentProcessors = ShardFollowTasksExecutor.DEFAULT_CONCURRENT_PROCESSORS;
private long processorMaxTranslogBytes = ShardFollowTasksExecutor.DEFAULT_MAX_TRANSLOG_BYTES;

public String getLeaderIndex() {
return leaderIndex;
}

public void setLeaderIndex(String leaderIndex) {
this.leaderIndex = leaderIndex;
}

public String getFollowIndex() {
return followIndex;
}
private int maxBatchOperationCount;
private int maxConcurrentReadBatches;
private long maxOperationSizeInBytes;
private int maxConcurrentWriteBatches;
private int maxWriteBufferSize;
private TimeValue retryTimeout;
private TimeValue idleShardRetryDelay;

public Request(String leaderIndex, String followIndex, int maxBatchOperationCount, int maxConcurrentReadBatches,
long maxOperationSizeInBytes, int maxConcurrentWriteBatches, int maxWriteBufferSize,
TimeValue retryTimeout, TimeValue idleShardRetryDelay) {
if (maxBatchOperationCount < 1) {
throw new IllegalArgumentException("maxBatchOperationCount must be larger than 0");
}
if (maxConcurrentReadBatches < 1) {
throw new IllegalArgumentException("concurrent_processors must be larger than 0");
}
if (maxOperationSizeInBytes <= 0) {
throw new IllegalArgumentException("processor_max_translog_bytes must be larger than 0");
}
if (maxConcurrentWriteBatches < 1) {
throw new IllegalArgumentException("maxConcurrentWriteBatches must be larger than 0");
}
if (maxWriteBufferSize < 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also check the time values are non null?

throw new IllegalArgumentException("maxWriteBufferSize must be larger than 0");
}

public void setFollowIndex(String followIndex) {
this.followIndex = followIndex;
this.leaderIndex = Objects.requireNonNull(leaderIndex);
this.followIndex = Objects.requireNonNull(followIndex);
this.maxBatchOperationCount = maxBatchOperationCount;
this.maxConcurrentReadBatches = maxConcurrentReadBatches;
this.maxOperationSizeInBytes = maxOperationSizeInBytes;
this.maxConcurrentWriteBatches = maxConcurrentWriteBatches;
this.maxWriteBufferSize = maxWriteBufferSize;
this.retryTimeout = retryTimeout;
this.idleShardRetryDelay = idleShardRetryDelay;
}

public long getBatchSize() {
return batchSize;
Request() {
}

public void setBatchSize(long batchSize) {
if (batchSize < 1) {
throw new IllegalArgumentException("Illegal batch_size [" + batchSize + "]");
}

this.batchSize = batchSize;
public String getLeaderIndex() {
return leaderIndex;
}

public void setConcurrentProcessors(int concurrentProcessors) {
if (concurrentProcessors < 1) {
throw new IllegalArgumentException("concurrent_processors must be larger than 0");
}
this.concurrentProcessors = concurrentProcessors;
public String getFollowIndex() {
return followIndex;
}

public void setProcessorMaxTranslogBytes(long processorMaxTranslogBytes) {
if (processorMaxTranslogBytes <= 0) {
throw new IllegalArgumentException("processor_max_translog_bytes must be larger than 0");
}
this.processorMaxTranslogBytes = processorMaxTranslogBytes;
public int getMaxBatchOperationCount() {
return maxBatchOperationCount;
}

@Override
Expand All @@ -127,36 +135,49 @@ public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
leaderIndex = in.readString();
followIndex = in.readString();
batchSize = in.readVLong();
concurrentProcessors = in.readVInt();
processorMaxTranslogBytes = in.readVLong();
maxBatchOperationCount = in.readVInt();
maxConcurrentReadBatches = in.readVInt();
maxOperationSizeInBytes = in.readVLong();
maxConcurrentWriteBatches = in.readVInt();
maxWriteBufferSize = in.readVInt();
retryTimeout = in.readOptionalTimeValue();
idleShardRetryDelay = in.readOptionalTimeValue();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(leaderIndex);
out.writeString(followIndex);
out.writeVLong(batchSize);
out.writeVInt(concurrentProcessors);
out.writeVLong(processorMaxTranslogBytes);
out.writeVInt(maxBatchOperationCount);
out.writeVInt(maxConcurrentReadBatches);
out.writeVLong(maxOperationSizeInBytes);
out.writeVInt(maxConcurrentWriteBatches);
out.writeVInt(maxWriteBufferSize);
out.writeOptionalTimeValue(retryTimeout);
out.writeOptionalTimeValue(idleShardRetryDelay);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return batchSize == request.batchSize &&
concurrentProcessors == request.concurrentProcessors &&
processorMaxTranslogBytes == request.processorMaxTranslogBytes &&
return maxBatchOperationCount == request.maxBatchOperationCount &&
maxConcurrentReadBatches == request.maxConcurrentReadBatches &&
maxOperationSizeInBytes == request.maxOperationSizeInBytes &&
maxConcurrentWriteBatches == request.maxConcurrentWriteBatches &&
maxWriteBufferSize == request.maxWriteBufferSize &&
Objects.equals(retryTimeout, request.retryTimeout) &&
Objects.equals(idleShardRetryDelay, request.idleShardRetryDelay) &&
Objects.equals(leaderIndex, request.leaderIndex) &&
Objects.equals(followIndex, request.followIndex);
}

@Override
public int hashCode() {
return Objects.hash(leaderIndex, followIndex, batchSize, concurrentProcessors, processorMaxTranslogBytes);
return Objects.hash(leaderIndex, followIndex, maxBatchOperationCount, maxConcurrentReadBatches, maxOperationSizeInBytes,
maxConcurrentWriteBatches, maxWriteBufferSize, retryTimeout, idleShardRetryDelay);
}
}

Expand Down Expand Up @@ -197,7 +218,7 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
ClusterState localClusterState = clusterService.state();
IndexMetaData followIndexMetadata = localClusterState.getMetaData().index(request.followIndex);

String[] indices = new String[]{request.getLeaderIndex()};
String[] indices = new String[]{request.leaderIndex};
Map<String, List<String>> remoteClusterIndices = remoteClusterService.groupClusterIndices(indices, s -> false);
if (remoteClusterIndices.containsKey(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
// Following an index in local cluster, so use local cluster state to fetch leader IndexMetaData:
Expand Down Expand Up @@ -251,10 +272,21 @@ void start(Request request, String clusterNameAlias, IndexMetaData leaderIndexMe
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));for (int i = 0; i < numShards; i++) {
final int shardId = i;
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
TimeValue retryTimeout = ShardFollowNodeTask.DEFAULT_RETRY_TIMEOUT;
if (request.retryTimeout != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think these can be null?

retryTimeout = request.retryTimeout;
}
TimeValue idleShardRetryDelay = ShardFollowNodeTask.DEFAULT_IDLE_SHARD_RETRY_DELAY;
if (request.idleShardRetryDelay != null) {
idleShardRetryDelay = request.idleShardRetryDelay;
}

ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias,
new ShardId(followIndexMetadata.getIndex(), shardId),
new ShardId(leaderIndexMetadata.getIndex(), shardId),
request.batchSize, request.concurrentProcessors, request.processorMaxTranslogBytes, filteredHeaders);
request.maxBatchOperationCount, request.maxConcurrentReadBatches, request.maxOperationSizeInBytes,
request.maxConcurrentWriteBatches, request.maxWriteBufferSize, retryTimeout, idleShardRetryDelay,
filteredHeaders);
persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask,
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() {
@Override
Expand Down
Loading