diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 286ee3f5c0d37..c7e6fc91b9440 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -92,6 +92,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; @TestLogging("org.elasticsearch.cluster.coordination:TRACE,org.elasticsearch.discovery:TRACE") @@ -305,7 +306,7 @@ public void testAckListenerReceivesNackFromLeader() { leader.setClusterStateApplyResponse(ClusterStateApplyResponse.FAIL); AckCollector ackCollector = leader.submitValue(randomLong()); - cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "committing value"); assertTrue(leader.coordinator.getMode() != Coordinator.Mode.LEADER || leader.coordinator.getCurrentTerm() > startingTerm); leader.setClusterStateApplyResponse(ClusterStateApplyResponse.SUCCEED); cluster.stabilise(); @@ -325,7 +326,7 @@ public void testAckListenerReceivesNoAckFromHangingFollower() { follower0.setClusterStateApplyResponse(ClusterStateApplyResponse.HANG); AckCollector ackCollector = leader.submitValue(randomLong()); - cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "committing value"); assertTrue("expected immediate ack from " + follower1, ackCollector.hasAckedSuccessfully(follower1)); assertFalse("expected no ack from " + leader, ackCollector.hasAckedSuccessfully(leader)); cluster.stabilise(); @@ -344,7 +345,7 @@ public void testAckListenerReceivesNacksIfPublicationTimesOut() { follower0.blackhole(); follower1.blackhole(); AckCollector ackCollector = leader.submitValue(randomLong()); - cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "committing value"); assertFalse("expected no immediate ack from " + leader, ackCollector.hasAcked(leader)); assertFalse("expected no immediate ack from " + follower0, ackCollector.hasAcked(follower0)); assertFalse("expected no immediate ack from " + follower1, ackCollector.hasAcked(follower1)); @@ -501,6 +502,8 @@ void runRandomly() { while (finishTime == -1 || deterministicTaskQueue.getCurrentTimeMillis() <= finishTime) { step++; + final int thisStep = step; // for lambdas + if (randomSteps <= step && finishTime == -1) { finishTime = deterministicTaskQueue.getLatestDeferredExecutionTime(); deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY); @@ -511,14 +514,19 @@ void runRandomly() { if (rarely()) { final ClusterNode clusterNode = getAnyNodePreferringLeaders(); final int newValue = randomInt(); - logger.debug("----> [runRandomly {}] proposing new value [{}] to [{}]", step, newValue, clusterNode.getId()); - clusterNode.submitValue(newValue); + onNode(clusterNode.getLocalNode(), () -> { + logger.debug("----> [runRandomly {}] proposing new value [{}] to [{}]", + thisStep, newValue, clusterNode.getId()); + clusterNode.submitValue(newValue); + }).run(); } else if (rarely()) { final ClusterNode clusterNode = getAnyNode(); - logger.debug("----> [runRandomly {}] forcing {} to become candidate", step, clusterNode.getId()); - synchronized (clusterNode.coordinator.mutex) { - clusterNode.coordinator.becomeCandidate("runRandomly"); - } + onNode(clusterNode.getLocalNode(), () -> { + logger.debug("----> [runRandomly {}] forcing {} to become candidate", thisStep, clusterNode.getId()); + synchronized (clusterNode.coordinator.mutex) { + clusterNode.coordinator.becomeCandidate("runRandomly"); + } + }).run(); } else if (rarely()) { final ClusterNode clusterNode = getAnyNode(); @@ -587,10 +595,10 @@ void stabilise() { stabilise(DEFAULT_STABILISATION_TIME); } - void stabilise(long stabiliationDurationMillis) { - logger.info("--> stabilising until [{}ms]", deterministicTaskQueue.getCurrentTimeMillis() + stabiliationDurationMillis); - deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY); - runFor(stabiliationDurationMillis); + void stabilise(long stabilisationDurationMillis) { + assertThat("stabilisation requires default delay variability (and proper cleanup of raised variability)", + deterministicTaskQueue.getExecutionDelayVariabilityMillis(), lessThanOrEqualTo(DEFAULT_DELAY_VARIABILITY)); + runFor(stabilisationDurationMillis, "stabilising"); // TODO remove when term-bumping is enabled final long maxTerm = clusterNodes.stream().map(n -> n.coordinator.getCurrentTerm()).max(Long::compare).orElse(0L); @@ -600,20 +608,22 @@ void stabilise(long stabiliationDurationMillis) { if (maxLeaderTerm < maxTerm) { logger.info("--> forcing a term bump, maxTerm={}, maxLeaderTerm={}", maxTerm, maxLeaderTerm); final ClusterNode leader = getAnyLeader(); - synchronized (leader.coordinator.mutex) { - leader.coordinator.ensureTermAtLeast(leader.localNode, maxTerm + 1); - } - leader.coordinator.startElection(); - logger.info("--> re-stabilising after term bump until [{}ms]", - deterministicTaskQueue.getCurrentTimeMillis() + DEFAULT_ELECTION_DELAY); - runFor(DEFAULT_ELECTION_DELAY); + onNode(leader.getLocalNode(), () -> { + synchronized (leader.coordinator.mutex) { + leader.coordinator.ensureTermAtLeast(leader.localNode, maxTerm + 1); + } + leader.coordinator.startElection(); + }).run(); + runFor(DEFAULT_ELECTION_DELAY, "re-stabilising after term bump"); } + logger.info("--> end of stabilisation"); assertUniqueLeaderAndExpectedModes(); } - void runFor(long runDurationMillis) { + void runFor(long runDurationMillis, String description) { final long endTime = deterministicTaskQueue.getCurrentTimeMillis() + runDurationMillis; + logger.info("----> runFor({}ms) running until [{}ms]: {}", runDurationMillis, endTime, description); while (deterministicTaskQueue.getCurrentTimeMillis() < endTime) { @@ -637,6 +647,8 @@ void runFor(long runDurationMillis) { deterministicTaskQueue.advanceTime(); } + + logger.info("----> runFor({}ms) completed run until [{}ms]: {}", runDurationMillis, endTime, description); } private boolean isConnectedPair(ClusterNode n1, ClusterNode n2) { @@ -963,7 +975,7 @@ public void run() { final ClusterState newClusterState = clusterStateSupplier.get(); assert oldClusterState.version() <= newClusterState.version() : "updating cluster state from version " - + oldClusterState.version() + " to stale version " + newClusterState.version(); + + oldClusterState.version() + " to stale version " + newClusterState.version(); clusterApplier.lastAppliedClusterState = newClusterState; }