Skip to content

Commit

Permalink
runRandomly CoordinationState
Browse files Browse the repository at this point in the history
  • Loading branch information
ywelsch committed Jul 20, 2018
1 parent 384cc54 commit 3bbb576
Show file tree
Hide file tree
Showing 2 changed files with 197 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,18 @@ public void handleCommit(ApplyCommitRequest applyCommit) {
assert getLastCommittedConfiguration().equals(getLastAcceptedConfiguration());
}

public void invariant() {
assert getLastAcceptedTerm() <= getCurrentTerm();
assert electionWon() == isElectionQuorum(joinVotes);
if (electionWon()) {
assert getLastPublishedVersion() >= getLastAcceptedVersion();
} else {
assert getLastPublishedVersion() == 0L;
}
assert electionWon() == false || startedJoinSinceLastReboot;
assert publishVotes.isEmpty() || electionWon();
}

/**
* Pluggable persistence layer for {@link CoordinationState}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,18 @@
import org.elasticsearch.test.EqualsHashCodeTestUtils;
import org.junit.Before;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static java.util.stream.Collectors.toSet;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;

public class CoordinationStateTests extends ESTestCase {

Expand Down Expand Up @@ -752,6 +759,11 @@ public void testVoteCollection() {
});
}

public void testSafety() {
Cluster cluster = new Cluster(randomIntBetween(1, 5));
cluster.runRandomly(10000);
}

public static CoordinationState createCoordinationState(PersistedState storage, DiscoveryNode localNode) {
final Settings initialSettings = Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), localNode.getId()).build();
return new CoordinationState(initialSettings, localNode, storage);
Expand Down Expand Up @@ -827,4 +839,177 @@ public ClusterState getLastAcceptedState() {
return acceptedState;
}
}

static class ClusterNode {

final DiscoveryNode localNode;
final PersistedState persistedState;
CoordinationState state;

ClusterNode(DiscoveryNode localNode) {
this.localNode = localNode;
persistedState = new InMemoryPersistedState(0L,
clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L));
state = new CoordinationState(Settings.EMPTY, localNode, persistedState);
}

void reboot() {
state = new CoordinationState(Settings.EMPTY, localNode, persistedState);
}

void setInitialState(VotingConfiguration initialConfig, long initialValue) {
final ClusterState.Builder builder = ClusterState.builder(state.getLastAcceptedState()).incrementVersion();
builder.lastAcceptedConfiguration(initialConfig);
builder.lastCommittedConfiguration(initialConfig);
state.setInitialState(setValue(builder.build(), initialValue));
}
}

static class Cluster {

final List<Message> messages;
final List<ClusterNode> clusterNodes;
final VotingConfiguration initialConfiguration;
final long initialValue;

Cluster(int numNodes) {
messages = new ArrayList<>();

clusterNodes = IntStream.range(0, numNodes)
.mapToObj(i -> new DiscoveryNode("node_" + i, buildNewFakeTransportAddress(), Version.CURRENT))
.map(ClusterNode::new)
.collect(Collectors.toList());

initialConfiguration = randomVotingConfig();
initialValue = randomLong();
}

static class Message {
final DiscoveryNode sourceNode;
final DiscoveryNode targetNode;
final Object payload;

Message(DiscoveryNode sourceNode, DiscoveryNode targetNode, Object payload) {
this.sourceNode = sourceNode;
this.targetNode = targetNode;
this.payload = payload;
}
}

void reply(Message m, Object payload) {
messages.add(new Message(m.targetNode, m.sourceNode, payload));
}

void broadcast(DiscoveryNode sourceNode, Object payload) {
messages.addAll(clusterNodes.stream().map(cn -> new Message(sourceNode, cn.localNode, payload)).collect(Collectors.toList()));
}

Optional<ClusterNode> getNode(DiscoveryNode node) {
return clusterNodes.stream().filter(cn -> cn.localNode.equals(node)).findFirst();
}

VotingConfiguration randomVotingConfig() {
return new VotingConfiguration(
randomSubsetOf(randomIntBetween(1, clusterNodes.size()), clusterNodes).stream()
.map(cn -> cn.localNode.getId()).collect(toSet()));
}

void applyMessage(Message message) {
final Optional<ClusterNode> maybeNode = getNode(message.targetNode);
if (maybeNode.isPresent() == false) {
throw new CoordinationStateRejectedException("node not available");
} else {
final Object payload = message.payload;
if (payload instanceof StartJoinRequest) {
reply(message, maybeNode.get().state.handleStartJoin((StartJoinRequest) payload));
} else if (payload instanceof Join) {
maybeNode.get().state.handleJoin((Join) payload);
} else if (payload instanceof PublishRequest) {
reply(message, maybeNode.get().state.handlePublishRequest((PublishRequest) payload));
} else if (payload instanceof PublishResponse) {
maybeNode.get().state.handlePublishResponse(message.sourceNode, (PublishResponse) payload)
.ifPresent(ac -> broadcast(message.targetNode, ac));
} else if (payload instanceof ApplyCommitRequest) {
maybeNode.get().state.handleCommit((ApplyCommitRequest) payload);
} else {
throw new AssertionError("unknown message type");
}
}
}

void runRandomly(int iterations) {
final long maxTerm = 4;
long nextTerm = 1;
for (int i = 0; i < iterations; i++) {
try {
if (rarely() && nextTerm < maxTerm) {
final long term = rarely() ? randomLongBetween(0, maxTerm + 1) : nextTerm++;
final StartJoinRequest startJoinRequest = new StartJoinRequest(randomFrom(clusterNodes).localNode, term);
broadcast(startJoinRequest.getSourceNode(), startJoinRequest);
} else if (rarely()) {
randomFrom(clusterNodes).setInitialState(initialConfiguration, initialValue);
} else if (rarely() && rarely()) {
randomFrom(clusterNodes).reboot();
} else if (rarely()) {
final List<ClusterNode> masterNodes = clusterNodes.stream().filter(cn -> cn.state.electionWon())
.collect(Collectors.toList());
if (masterNodes.isEmpty() == false) {
final ClusterNode clusterNode = randomFrom(masterNodes);
final long term = rarely() ? randomLongBetween(0, maxTerm + 1) : clusterNode.state.getCurrentTerm();
final long version = rarely() ? randomIntBetween(0, 5) : clusterNode.state.getLastPublishedVersion() + 1;
final VotingConfiguration acceptedConfig = rarely() ? randomVotingConfig() :
clusterNode.state.getLastAcceptedConfiguration();
final PublishRequest publishRequest = clusterNode.state.handleClientValue(
clusterState(term, version, clusterNode.localNode, clusterNode.state.getLastCommittedConfiguration(),
acceptedConfig, randomLong()));
broadcast(clusterNode.localNode, publishRequest);
}
} else if (messages.isEmpty() == false) {
applyMessage(randomFrom(messages));
}

// check node invariants after each iteration
clusterNodes.forEach(cn -> cn.state.invariant());
} catch (CoordinationStateRejectedException e) {
// ignore
}
}

// check system invariants
invariant();
}

void invariant() {
// one master per term
messages.stream().filter(m -> m.payload instanceof PublishRequest)
.collect(Collectors.groupingBy(m -> ((PublishRequest) m.payload).getAcceptedState().term()))
.forEach((term, publishMessages) -> {
Set<DiscoveryNode> mastersForTerm = publishMessages.stream().collect(Collectors.groupingBy(m -> m.sourceNode)).keySet();
assertThat("Multiple masters " + mastersForTerm + " for term " + term, mastersForTerm, hasSize(1));
});

// unique cluster state per (term, version) pair
messages.stream().filter(m -> m.payload instanceof PublishRequest)
.map(m -> ((PublishRequest) m.payload).getAcceptedState())
.collect(Collectors.groupingBy(ClusterState::term))
.forEach((term, clusterStates) -> {
clusterStates.stream().collect(Collectors.groupingBy(ClusterState::version))
.forEach((version, clusterStates1) -> {
Set<String> clusterStateUUIDsForTermAndVersion = clusterStates1.stream().collect(Collectors.groupingBy(
ClusterState::stateUUID
)).keySet();
assertThat("Multiple cluster states " + clusterStates1 + " for term " + term + " and version " + version,
clusterStateUUIDsForTermAndVersion, hasSize(1));

Set<Long> clusterStateValuesForTermAndVersion = clusterStates1.stream().collect(Collectors.groupingBy(
CoordinationStateTests::value
)).keySet();

assertThat("Multiple cluster states " + clusterStates1 + " for term " + term + " and version " + version,
clusterStateValuesForTermAndVersion, hasSize(1));
});
});
}

}
}

0 comments on commit 3bbb576

Please sign in to comment.