Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Zen2] Add lag detector #35685

Merged
merged 17 commits into from
Nov 26, 2018
Merged

Conversation

DaveCTurner
Copy link
Contributor

A publication can succeed and complete before all nodes have applied the
published state and acknowledged it, thanks to the publication timeout; however
we need every node eventually either to apply the published state (or a later
state) or be removed from the cluster. This change introduces the LagDetector
which achieves this liveness property by removing any lagging nodes from the
cluster.

A publication can succeed and complete before all nodes have applied the
published state and acknowledged it, thanks to the publication timeout; however
we need every node eventually either to apply the published state (or a later
state) or be removed from the cluster. This change introduces the LagDetector
which achieves this liveness property by removing any lagging nodes from the
cluster.
@DaveCTurner DaveCTurner added >enhancement v7.0.0 :Distributed/Cluster Coordination Cluster formation and cluster state publication, including cluster membership and fault detection. labels Nov 19, 2018
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-distributed

Copy link
Contributor

@ywelsch ywelsch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks very good. I've left one question which might have impact on the tests, so I'm leaving the review of those to later.

@ywelsch ywelsch mentioned this pull request Nov 20, 2018
61 tasks
@DaveCTurner
Copy link
Contributor Author

Ok, I've addressed all those points, thanks.

Copy link
Contributor

@ywelsch ywelsch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor comments and perhaps a simplification to avoid adding a hook into the Publication class

@@ -251,6 +255,7 @@ void sendApplyCommit() {
void setAppliedCommit() {
assert state == PublicationTargetState.SENT_APPLY_COMMIT : state + " -> " + PublicationTargetState.APPLIED_COMMIT;
state = PublicationTargetState.APPLIED_COMMIT;
onNodeApplicationAck.accept(discoveryNode, publishRequest.getAcceptedState().version());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can avoid to hook this in here, and whether there's a way to do this in Coordinator. We call ackOnce(null) here, which in turn calls AckListener.onNodeAck(DiscoveryNode, @Nullable Exception). We already hook into that acklistener in Coordinator, so we could also get those events there. And we also know the version of the cluster state we're publishing in Coordinator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And we also know the version of the cluster state we're publishing in Coordinator.

How do we know that? We start the lag detector after clearing currentPublication, and another publication could then start. It wouldn't be right to detect lag based on a newer publication.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My suggestion is something like the following (I had to revert the not-null assertion, because I think we don't have that guarantee, and CoordinatorTests were failing):

diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java
index f132998d6ba..64e9310402c 100644
--- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java
+++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java
@@ -985,6 +985,9 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
 
                     @Override
                     public void onNodeAck(DiscoveryNode node, Exception e) {
+                        if (e == null) {
+                            lagDetector.setAppliedVersion(node, publishRequest.getAcceptedState().version());
+                        }
                         // acking and cluster state application for local node is handled specially
                         if (node.equals(getLocalNode())) {
                             synchronized (mutex) {
@@ -999,7 +1002,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
                         }
                     }
                 },
-                transportService.getThreadPool()::relativeTimeInMillis, lagDetector::setAppliedVersion);
+                transportService.getThreadPool()::relativeTimeInMillis);
             this.publishRequest = publishRequest;
             this.publicationContext = publicationContext;
             this.localNodeAckEvent = localNodeAckEvent;
diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/LagDetector.java b/server/src/main/java/org/elasticsearch/cluster/coordination/LagDetector.java
index 3180913a012..ea52ec95673 100644
--- a/server/src/main/java/org/elasticsearch/cluster/coordination/LagDetector.java
+++ b/server/src/main/java/org/elasticsearch/cluster/coordination/LagDetector.java
@@ -85,8 +85,11 @@ public class LagDetector {
         }
 
         final NodeAppliedStateTracker nodeAppliedStateTracker = appliedStateTrackersByNode.get(discoveryNode);
-        assert nodeAppliedStateTracker != null : "untracked node " + discoveryNode + " applied version " + appliedVersion;
-        nodeAppliedStateTracker.increaseAppliedVersion(appliedVersion);
+        if (nodeAppliedStateTracker == null) {
+            logger.trace("node {} applied version {} but this node's version is not being tracked", discoveryNode, appliedVersion);
+        } else {
+            nodeAppliedStateTracker.increaseAppliedVersion(appliedVersion);
+        }
     }
 
     public void startLagDetector(final long version) {
diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java
index a602750bba8..9ec8d562b81 100644
--- a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java
+++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java
@@ -36,7 +36,6 @@ import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.function.LongSupplier;
-import java.util.function.ObjLongConsumer;
 
 public abstract class Publication {
 
@@ -47,19 +46,16 @@ public abstract class Publication {
     private final AckListener ackListener;
     private final LongSupplier currentTimeSupplier;
     private final long startTime;
-    private final ObjLongConsumer<DiscoveryNode> onNodeApplicationAck;
 
     private Optional<ApplyCommitRequest> applyCommitRequest; // set when state is committed
     private boolean isCompleted; // set when publication is completed
     private boolean timedOut; // set when publication timed out
 
-    public Publication(PublishRequest publishRequest, AckListener ackListener, LongSupplier currentTimeSupplier,
-                       ObjLongConsumer<DiscoveryNode> onNodeApplicationAck) {
+    public Publication(PublishRequest publishRequest, AckListener ackListener, LongSupplier currentTimeSupplier) {
         this.publishRequest = publishRequest;
         this.ackListener = ackListener;
         this.currentTimeSupplier = currentTimeSupplier;
         startTime = currentTimeSupplier.getAsLong();
-        this.onNodeApplicationAck = onNodeApplicationAck;
         applyCommitRequest = Optional.empty();
         publicationTargets = new ArrayList<>(publishRequest.getAcceptedState().getNodes().getNodes().size());
         publishRequest.getAcceptedState().getNodes().iterator().forEachRemaining(n -> publicationTargets.add(new PublicationTarget(n)));
@@ -255,7 +251,6 @@ public abstract class Publication {
         void setAppliedCommit() {
             assert state == PublicationTargetState.SENT_APPLY_COMMIT : state + " -> " + PublicationTargetState.APPLIED_COMMIT;
             state = PublicationTargetState.APPLIED_COMMIT;
-            onNodeApplicationAck.accept(discoveryNode, publishRequest.getAcceptedState().version());
             ackOnce(null);
         }
 
diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java
index e17c77ce6dc..914ee1e95f7 100644
--- a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java
+++ b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java
@@ -103,8 +103,7 @@ public class PublicationTests extends ESTestCase {
         Set<DiscoveryNode> missingJoins = new HashSet<>();
 
         MockPublication(PublishRequest publishRequest, Discovery.AckListener ackListener, LongSupplier currentTimeSupplier) {
-            super(publishRequest, ackListener, currentTimeSupplier, (n, l) -> {
-            });
+            super(publishRequest, ackListener, currentTimeSupplier);
             this.publishRequest = publishRequest;
         }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, ok, I did that in 63b21ee.

deterministicTaskQueue = new DeterministicTaskQueue(Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build(), random());

failedNodes = new HashSet<>();
lagDetector = new LagDetector(Settings.EMPTY, deterministicTaskQueue.getThreadPool(), failedNodes::add, () -> localNode);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

randomize the lag timeout here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, 3102f8c

@ywelsch
Copy link
Contributor

ywelsch commented Nov 23, 2018

I've left two more comments.

Copy link
Contributor

@ywelsch ywelsch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@DaveCTurner DaveCTurner merged commit a68a464 into elastic:zen2 Nov 26, 2018
@DaveCTurner DaveCTurner deleted the 2018-11-19-lag-detector branch November 26, 2018 10:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed/Cluster Coordination Cluster formation and cluster state publication, including cluster membership and fault detection. >enhancement v7.0.0-beta1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants