Skip to content

Commit

Permalink
Minor housekeeping of tests (#34315)
Browse files Browse the repository at this point in the history
From experience with #34257, here are a few things that help with analysing
logs from test runs. Also we prevent trying to stabilise a cluster with raised
delay variability, because lowering the delay variability requires time to
allow all the extra-varied-scheduled tasks to work their way out of the system.
  • Loading branch information
DaveCTurner authored Oct 5, 2018
1 parent b32abcb commit 29d7d1d
Showing 1 changed file with 34 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;

@TestLogging("org.elasticsearch.cluster.coordination:TRACE,org.elasticsearch.discovery:TRACE")
Expand Down Expand Up @@ -305,7 +306,7 @@ public void testAckListenerReceivesNackFromLeader() {

leader.setClusterStateApplyResponse(ClusterStateApplyResponse.FAIL);
AckCollector ackCollector = leader.submitValue(randomLong());
cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "committing value");
assertTrue(leader.coordinator.getMode() != Coordinator.Mode.LEADER || leader.coordinator.getCurrentTerm() > startingTerm);
leader.setClusterStateApplyResponse(ClusterStateApplyResponse.SUCCEED);
cluster.stabilise();
Expand All @@ -325,7 +326,7 @@ public void testAckListenerReceivesNoAckFromHangingFollower() {

follower0.setClusterStateApplyResponse(ClusterStateApplyResponse.HANG);
AckCollector ackCollector = leader.submitValue(randomLong());
cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "committing value");
assertTrue("expected immediate ack from " + follower1, ackCollector.hasAckedSuccessfully(follower1));
assertFalse("expected no ack from " + leader, ackCollector.hasAckedSuccessfully(leader));
cluster.stabilise();
Expand All @@ -344,7 +345,7 @@ public void testAckListenerReceivesNacksIfPublicationTimesOut() {
follower0.blackhole();
follower1.blackhole();
AckCollector ackCollector = leader.submitValue(randomLong());
cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "committing value");
assertFalse("expected no immediate ack from " + leader, ackCollector.hasAcked(leader));
assertFalse("expected no immediate ack from " + follower0, ackCollector.hasAcked(follower0));
assertFalse("expected no immediate ack from " + follower1, ackCollector.hasAcked(follower1));
Expand Down Expand Up @@ -501,6 +502,8 @@ void runRandomly() {

while (finishTime == -1 || deterministicTaskQueue.getCurrentTimeMillis() <= finishTime) {
step++;
final int thisStep = step; // for lambdas

if (randomSteps <= step && finishTime == -1) {
finishTime = deterministicTaskQueue.getLatestDeferredExecutionTime();
deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY);
Expand All @@ -511,14 +514,19 @@ void runRandomly() {
if (rarely()) {
final ClusterNode clusterNode = getAnyNodePreferringLeaders();
final int newValue = randomInt();
logger.debug("----> [runRandomly {}] proposing new value [{}] to [{}]", step, newValue, clusterNode.getId());
clusterNode.submitValue(newValue);
onNode(clusterNode.getLocalNode(), () -> {
logger.debug("----> [runRandomly {}] proposing new value [{}] to [{}]",
thisStep, newValue, clusterNode.getId());
clusterNode.submitValue(newValue);
}).run();
} else if (rarely()) {
final ClusterNode clusterNode = getAnyNode();
logger.debug("----> [runRandomly {}] forcing {} to become candidate", step, clusterNode.getId());
synchronized (clusterNode.coordinator.mutex) {
clusterNode.coordinator.becomeCandidate("runRandomly");
}
onNode(clusterNode.getLocalNode(), () -> {
logger.debug("----> [runRandomly {}] forcing {} to become candidate", thisStep, clusterNode.getId());
synchronized (clusterNode.coordinator.mutex) {
clusterNode.coordinator.becomeCandidate("runRandomly");
}
}).run();
} else if (rarely()) {
final ClusterNode clusterNode = getAnyNode();

Expand Down Expand Up @@ -587,10 +595,10 @@ void stabilise() {
stabilise(DEFAULT_STABILISATION_TIME);
}

void stabilise(long stabiliationDurationMillis) {
logger.info("--> stabilising until [{}ms]", deterministicTaskQueue.getCurrentTimeMillis() + stabiliationDurationMillis);
deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY);
runFor(stabiliationDurationMillis);
void stabilise(long stabilisationDurationMillis) {
assertThat("stabilisation requires default delay variability (and proper cleanup of raised variability)",
deterministicTaskQueue.getExecutionDelayVariabilityMillis(), lessThanOrEqualTo(DEFAULT_DELAY_VARIABILITY));
runFor(stabilisationDurationMillis, "stabilising");

// TODO remove when term-bumping is enabled
final long maxTerm = clusterNodes.stream().map(n -> n.coordinator.getCurrentTerm()).max(Long::compare).orElse(0L);
Expand All @@ -600,20 +608,22 @@ void stabilise(long stabiliationDurationMillis) {
if (maxLeaderTerm < maxTerm) {
logger.info("--> forcing a term bump, maxTerm={}, maxLeaderTerm={}", maxTerm, maxLeaderTerm);
final ClusterNode leader = getAnyLeader();
synchronized (leader.coordinator.mutex) {
leader.coordinator.ensureTermAtLeast(leader.localNode, maxTerm + 1);
}
leader.coordinator.startElection();
logger.info("--> re-stabilising after term bump until [{}ms]",
deterministicTaskQueue.getCurrentTimeMillis() + DEFAULT_ELECTION_DELAY);
runFor(DEFAULT_ELECTION_DELAY);
onNode(leader.getLocalNode(), () -> {
synchronized (leader.coordinator.mutex) {
leader.coordinator.ensureTermAtLeast(leader.localNode, maxTerm + 1);
}
leader.coordinator.startElection();
}).run();
runFor(DEFAULT_ELECTION_DELAY, "re-stabilising after term bump");
}
logger.info("--> end of stabilisation");

assertUniqueLeaderAndExpectedModes();
}

void runFor(long runDurationMillis) {
void runFor(long runDurationMillis, String description) {
final long endTime = deterministicTaskQueue.getCurrentTimeMillis() + runDurationMillis;
logger.info("----> runFor({}ms) running until [{}ms]: {}", runDurationMillis, endTime, description);

while (deterministicTaskQueue.getCurrentTimeMillis() < endTime) {

Expand All @@ -637,6 +647,8 @@ void runFor(long runDurationMillis) {

deterministicTaskQueue.advanceTime();
}

logger.info("----> runFor({}ms) completed run until [{}ms]: {}", runDurationMillis, endTime, description);
}

private boolean isConnectedPair(ClusterNode n1, ClusterNode n2) {
Expand Down Expand Up @@ -963,7 +975,7 @@ public void run() {
final ClusterState newClusterState = clusterStateSupplier.get();
assert oldClusterState.version() <= newClusterState.version() :
"updating cluster state from version "
+ oldClusterState.version() + " to stale version " + newClusterState.version();
+ oldClusterState.version() + " to stale version " + newClusterState.version();
clusterApplier.lastAppliedClusterState = newClusterState;
}

Expand Down

0 comments on commit 29d7d1d

Please sign in to comment.