Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CCR] Made shard follow task more resilient against node failure and #31242

Closed
wants to merge 2 commits into from

Conversation

martijnvg
Copy link
Member

added a test that verifies we can close a node while following an index.

@martijnvg martijnvg added WIP :Distributed/CCR Issues around the Cross Cluster State Replication features labels Jun 11, 2018
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-distributed

@martijnvg
Copy link
Member Author

martijnvg commented Jun 11, 2018

I've marked this PR as WIP, because the test added in this pr sometimes fails with a assertion like this one:

com.carrotsearch.randomizedtesting.UncaughtExceptionError: Captured an uncaught exception in thread: Thread[id=890, name=elasticsearch[node_t1][write][T#3], state=RUNNABLE, group=TGRP-ShardChangesIT]

Caused by: java.lang.AssertionError: seqNo [145] was processed twice in generation [2], with different data. prvOp [Index{id='OdQ-7mMB7Vq08HSajLM6', type='doc', seqNo=145, primaryTerm=1}], newOp [Index{id='OdQ-7mMB7Vq08HSajLM6', type='doc', seqNo=145, primaryTerm=2}]
	at __randomizedtesting.SeedInfo.seed([E8A3400D23EF4FF9]:0)
	at org.elasticsearch.index.translog.TranslogWriter.assertNoSeqNumberConflict(TranslogWriter.java:207)
	at org.elasticsearch.index.translog.TranslogWriter.add(TranslogWriter.java:192)
	at org.elasticsearch.index.translog.Translog.add(Translog.java:499)
	at org.elasticsearch.index.engine.InternalEngine.index(InternalEngine.java:794)
	at org.elasticsearch.index.shard.IndexShard.index(IndexShard.java:699)
	at org.elasticsearch.index.shard.IndexShard.applyIndexOperation(IndexShard.java:675)
	at org.elasticsearch.index.shard.IndexShard.applyTranslogOperation(IndexShard.java:1209)
	at org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction.applyTranslogOperations(TransportBulkShardOperationsAction.java:120)
	at org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction.shardOperationOnReplica(TransportBulkShardOperationsAction.java:112)
	at org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction.shardOperationOnReplica(TransportBulkShardOperationsAction.java:29)
	at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncReplicaAction.onResponse(TransportReplicationAction.java:565)
	at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncReplicaAction.onResponse(TransportReplicationAction.java:528)
	at org.elasticsearch.index.shard.IndexShard$2.onResponse(IndexShard.java:2294)
	at org.elasticsearch.index.shard.IndexShard$2.onResponse(IndexShard.java:2272)
	at org.elasticsearch.index.shard.IndexShardOperationPermits.acquire(IndexShardOperationPermits.java:266)
	at org.elasticsearch.index.shard.IndexShardOperationPermits.acquire(IndexShardOperationPermits.java:233)
	at org.elasticsearch.index.shard.IndexShard.acquireReplicaOperationPermit(IndexShard.java:2271)
	at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncReplicaAction.doRun(TransportReplicationAction.java:634)
	at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
	at org.elasticsearch.action.support.replication.TransportReplicationAction$ReplicaOperationTransportHandler.messageReceived(TransportReplicationAction.java:511)
	at org.elasticsearch.action.support.replication.TransportReplicationAction$ReplicaOperationTransportHandler.messageReceived(TransportReplicationAction.java:491)
	at org.elasticsearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:66)
	at org.elasticsearch.transport.TcpTransport$RequestHandler.doRun(TcpTransport.java:1663)
	at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:724)
	at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:844)
Caused by: java.lang.RuntimeException: stack capture previous op
	at org.elasticsearch.index.translog.TranslogWriter.assertNoSeqNumberConflict(TranslogWriter.java:211)
	at org.elasticsearch.index.translog.TranslogWriter.add(TranslogWriter.java:192)
	at org.elasticsearch.index.translog.Translog.add(Translog.java:499)
	at org.elasticsearch.index.engine.InternalEngine.index(InternalEngine.java:794)
	at org.elasticsearch.index.shard.IndexShard.index(IndexShard.java:699)
	at org.elasticsearch.index.shard.IndexShard.applyIndexOperation(IndexShard.java:675)
	at org.elasticsearch.index.shard.IndexShard.applyTranslogOperation(IndexShard.java:1209)
	at org.elasticsearch.indices.recovery.RecoveryTarget.indexTranslogOperations(RecoveryTarget.java:397)
	at org.elasticsearch.indices.recovery.PeerRecoveryTargetService$TranslogOperationsRequestHandler.messageReceived(PeerRecoveryTargetService.java:454)
	at org.elasticsearch.indices.recovery.PeerRecoveryTargetService$TranslogOperationsRequestHandler.messageReceived(PeerRecoveryTargetService.java:445)
	at org.elasticsearch.transport.TransportRequestHandler.messageReceived(TransportRequestHandler.java:30)
	... 7 more

@martijnvg
Copy link
Member Author

martijnvg commented Jun 12, 2018

The reason that this assertion trips is because the primary term for the two translog operations with the same seqno are different. With normal indexing / replication this error should not happen, but with ccr this can currently happen.

In this case particular situation, the node with a follow index primary was closed while the shard follow task was indexing into this shard. The shard follow task then re-sends the same translog operations to the promoted replica shard copy and new replica shard copy. At the same time this new replica shard is also doing shard recovery. This causes the assertion to trip, because the two translog operations with same data and seqno number, but with a different primary term are detected.

Besides this assertion tripping, the follower shard now also contains an additional document that the leader does not have. This document for the write operation with the lowest primary term is soft deleted, but if this follow shard becomes a leader shard for another follow index setup then we will also replicate this surprise write operation the second follow index.

Outside of this assertion failure the test that closes a node while follow an index test passes and the changes to ShardFollowTasksExecutor class were needed to make the shard follow task resilient against nodes / shards disappearing. So I'm adding the review label back.

@martijnvg martijnvg added review and removed WIP labels Jun 12, 2018
added a test that verifies we can close a node while following an index.
@martijnvg
Copy link
Member Author

Superseded by: #31581

@martijnvg martijnvg closed this Jun 26, 2018
martijnvg added a commit to martijnvg/elasticsearch that referenced this pull request Jun 27, 2018
The current shard follow mechanism is complex and does not give us easy ways the have visibility into the system (e.g. why we are falling behind).
The main reason why it is complex is because the current design is highly asynchronous. Also in the current model it is hard to apply backpressure
other than reducing the concurrent reads from the leader shard.

This PR has the following changes:
* Rewrote the shard follow task to coordinate the shard follow mechanism between a leader and follow shard in a single threaded manner.
  This allows for better unit testing and makes it easier to add stats.
* All write operations read from the shard changes api should be added to a buffer instead of directly sending it to the bulk shard operations api.
  This allows to apply backpressure. In this PR there is a limit that controls how many write ops are allowed in the buffer after which no new reads
  will be performed until the number of ops is below that limit.
* The shard changes api includes the current global checkpoint on the leader shard copy. This allows reading to be a more self sufficient process;
  instead of relying on a background thread to fetch the leader shard's global checkpoint.
* Reading write operations from the leader shard (via shard changes api) is a seperate step then writing the write operations (via bulk shards operations api).
  Whereas before a read would immediately result into a write.
* The bulk shard operations api returns the local checkpoint on the follow primary shard, to keep the shard follow task up to date with what has been written.
* Moved the shard follow logic that was previously in ShardFollowTasksExecutor to ShardFollowNodeTask.
* Moved over the changes from elastic#31242 to make shard follow mechanism resilient from node and shard failures.

Relates to elastic#30086
martijnvg added a commit that referenced this pull request Jul 10, 2018
The current shard follow mechanism is complex and does not give us easy ways the have visibility into the system (e.g. why we are falling behind).
The main reason why it is complex is because the current design is highly asynchronous. Also in the current model it is hard to apply backpressure
other than reducing the concurrent reads from the leader shard.

This PR has the following changes:
* Rewrote the shard follow task to coordinate the shard follow mechanism between a leader and follow shard in a single threaded manner.
  This allows for better unit testing and makes it easier to add stats.
* All write operations read from the shard changes api should be added to a buffer instead of directly sending it to the bulk shard operations api.
  This allows to apply backpressure. In this PR there is a limit that controls how many write ops are allowed in the buffer after which no new reads
  will be performed until the number of ops is below that limit.
* The shard changes api includes the current global checkpoint on the leader shard copy. This allows reading to be a more self sufficient process;
  instead of relying on a background thread to fetch the leader shard's global checkpoint.
* Reading write operations from the leader shard (via shard changes api) is a separate step then writing the write operations (via bulk shards operations api).
  Whereas before a read would immediately result into a write.
* The bulk shard operations api returns the local checkpoint on the follow primary shard, to keep the shard follow task up to date with what has been written.
* Moved the shard follow logic that was previously in ShardFollowTasksExecutor to ShardFollowNodeTask.
* Moved over the changes from #31242 to make shard follow mechanism resilient from node and shard failures.

Relates to #30086
martijnvg added a commit that referenced this pull request Jul 10, 2018
The current shard follow mechanism is complex and does not give us easy ways the have visibility into the system (e.g. why we are falling behind).
The main reason why it is complex is because the current design is highly asynchronous. Also in the current model it is hard to apply backpressure
other than reducing the concurrent reads from the leader shard.

This PR has the following changes:
* Rewrote the shard follow task to coordinate the shard follow mechanism between a leader and follow shard in a single threaded manner.
  This allows for better unit testing and makes it easier to add stats.
* All write operations read from the shard changes api should be added to a buffer instead of directly sending it to the bulk shard operations api.
  This allows to apply backpressure. In this PR there is a limit that controls how many write ops are allowed in the buffer after which no new reads
  will be performed until the number of ops is below that limit.
* The shard changes api includes the current global checkpoint on the leader shard copy. This allows reading to be a more self sufficient process;
  instead of relying on a background thread to fetch the leader shard's global checkpoint.
* Reading write operations from the leader shard (via shard changes api) is a separate step then writing the write operations (via bulk shards operations api).
  Whereas before a read would immediately result into a write.
* The bulk shard operations api returns the local checkpoint on the follow primary shard, to keep the shard follow task up to date with what has been written.
* Moved the shard follow logic that was previously in ShardFollowTasksExecutor to ShardFollowNodeTask.
* Moved over the changes from #31242 to make shard follow mechanism resilient from node and shard failures.

Relates to #30086
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed/CCR Issues around the Cross Cluster State Replication features
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants