-
Notifications
You must be signed in to change notification settings - Fork 24.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Zen2] Add lag detector #35685
[Zen2] Add lag detector #35685
Conversation
A publication can succeed and complete before all nodes have applied the published state and acknowledged it, thanks to the publication timeout; however we need every node eventually either to apply the published state (or a later state) or be removed from the cluster. This change introduces the LagDetector which achieves this liveness property by removing any lagging nodes from the cluster.
Pinging @elastic/es-distributed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks very good. I've left one question which might have impact on the tests, so I'm leaving the review of those to later.
server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/cluster/coordination/LagDetector.java
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/cluster/coordination/LagDetector.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/cluster/coordination/LagDetector.java
Outdated
Show resolved
Hide resolved
Ok, I've addressed all those points, thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor comments and perhaps a simplification to avoid adding a hook into the Publication class
server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/cluster/coordination/LagDetector.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/cluster/coordination/LagDetector.java
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/cluster/coordination/LagDetector.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/cluster/coordination/LagDetector.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/cluster/coordination/LagDetector.java
Show resolved
Hide resolved
@@ -251,6 +255,7 @@ void sendApplyCommit() { | |||
void setAppliedCommit() { | |||
assert state == PublicationTargetState.SENT_APPLY_COMMIT : state + " -> " + PublicationTargetState.APPLIED_COMMIT; | |||
state = PublicationTargetState.APPLIED_COMMIT; | |||
onNodeApplicationAck.accept(discoveryNode, publishRequest.getAcceptedState().version()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we can avoid to hook this in here, and whether there's a way to do this in Coordinator. We call ackOnce(null)
here, which in turn calls AckListener.onNodeAck(DiscoveryNode, @Nullable Exception)
. We already hook into that acklistener in Coordinator, so we could also get those events there. And we also know the version of the cluster state we're publishing in Coordinator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And we also know the version of the cluster state we're publishing in Coordinator.
How do we know that? We start the lag detector after clearing currentPublication
, and another publication could then start. It wouldn't be right to detect lag based on a newer publication.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My suggestion is something like the following (I had to revert the not-null assertion, because I think we don't have that guarantee, and CoordinatorTests were failing):
diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java
index f132998d6ba..64e9310402c 100644
--- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java
+++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java
@@ -985,6 +985,9 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
@Override
public void onNodeAck(DiscoveryNode node, Exception e) {
+ if (e == null) {
+ lagDetector.setAppliedVersion(node, publishRequest.getAcceptedState().version());
+ }
// acking and cluster state application for local node is handled specially
if (node.equals(getLocalNode())) {
synchronized (mutex) {
@@ -999,7 +1002,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
}
}
},
- transportService.getThreadPool()::relativeTimeInMillis, lagDetector::setAppliedVersion);
+ transportService.getThreadPool()::relativeTimeInMillis);
this.publishRequest = publishRequest;
this.publicationContext = publicationContext;
this.localNodeAckEvent = localNodeAckEvent;
diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/LagDetector.java b/server/src/main/java/org/elasticsearch/cluster/coordination/LagDetector.java
index 3180913a012..ea52ec95673 100644
--- a/server/src/main/java/org/elasticsearch/cluster/coordination/LagDetector.java
+++ b/server/src/main/java/org/elasticsearch/cluster/coordination/LagDetector.java
@@ -85,8 +85,11 @@ public class LagDetector {
}
final NodeAppliedStateTracker nodeAppliedStateTracker = appliedStateTrackersByNode.get(discoveryNode);
- assert nodeAppliedStateTracker != null : "untracked node " + discoveryNode + " applied version " + appliedVersion;
- nodeAppliedStateTracker.increaseAppliedVersion(appliedVersion);
+ if (nodeAppliedStateTracker == null) {
+ logger.trace("node {} applied version {} but this node's version is not being tracked", discoveryNode, appliedVersion);
+ } else {
+ nodeAppliedStateTracker.increaseAppliedVersion(appliedVersion);
+ }
}
public void startLagDetector(final long version) {
diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java
index a602750bba8..9ec8d562b81 100644
--- a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java
+++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java
@@ -36,7 +36,6 @@ import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.LongSupplier;
-import java.util.function.ObjLongConsumer;
public abstract class Publication {
@@ -47,19 +46,16 @@ public abstract class Publication {
private final AckListener ackListener;
private final LongSupplier currentTimeSupplier;
private final long startTime;
- private final ObjLongConsumer<DiscoveryNode> onNodeApplicationAck;
private Optional<ApplyCommitRequest> applyCommitRequest; // set when state is committed
private boolean isCompleted; // set when publication is completed
private boolean timedOut; // set when publication timed out
- public Publication(PublishRequest publishRequest, AckListener ackListener, LongSupplier currentTimeSupplier,
- ObjLongConsumer<DiscoveryNode> onNodeApplicationAck) {
+ public Publication(PublishRequest publishRequest, AckListener ackListener, LongSupplier currentTimeSupplier) {
this.publishRequest = publishRequest;
this.ackListener = ackListener;
this.currentTimeSupplier = currentTimeSupplier;
startTime = currentTimeSupplier.getAsLong();
- this.onNodeApplicationAck = onNodeApplicationAck;
applyCommitRequest = Optional.empty();
publicationTargets = new ArrayList<>(publishRequest.getAcceptedState().getNodes().getNodes().size());
publishRequest.getAcceptedState().getNodes().iterator().forEachRemaining(n -> publicationTargets.add(new PublicationTarget(n)));
@@ -255,7 +251,6 @@ public abstract class Publication {
void setAppliedCommit() {
assert state == PublicationTargetState.SENT_APPLY_COMMIT : state + " -> " + PublicationTargetState.APPLIED_COMMIT;
state = PublicationTargetState.APPLIED_COMMIT;
- onNodeApplicationAck.accept(discoveryNode, publishRequest.getAcceptedState().version());
ackOnce(null);
}
diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java
index e17c77ce6dc..914ee1e95f7 100644
--- a/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java
+++ b/server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java
@@ -103,8 +103,7 @@ public class PublicationTests extends ESTestCase {
Set<DiscoveryNode> missingJoins = new HashSet<>();
MockPublication(PublishRequest publishRequest, Discovery.AckListener ackListener, LongSupplier currentTimeSupplier) {
- super(publishRequest, ackListener, currentTimeSupplier, (n, l) -> {
- });
+ super(publishRequest, ackListener, currentTimeSupplier);
this.publishRequest = publishRequest;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, ok, I did that in 63b21ee.
server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java
Show resolved
Hide resolved
deterministicTaskQueue = new DeterministicTaskQueue(Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build(), random()); | ||
|
||
failedNodes = new HashSet<>(); | ||
lagDetector = new LagDetector(Settings.EMPTY, deterministicTaskQueue.getThreadPool(), failedNodes::add, () -> localNode); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
randomize the lag timeout here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, 3102f8c
I've left two more comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
A publication can succeed and complete before all nodes have applied the
published state and acknowledged it, thanks to the publication timeout; however
we need every node eventually either to apply the published state (or a later
state) or be removed from the cluster. This change introduces the LagDetector
which achieves this liveness property by removing any lagging nodes from the
cluster.