diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java index afbf6363618a5..6cda2a04bd748 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java @@ -388,6 +388,18 @@ public void handleCommit(ApplyCommitRequest applyCommit) { assert getLastCommittedConfiguration().equals(getLastAcceptedConfiguration()); } + public void invariant() { + assert getLastAcceptedTerm() <= getCurrentTerm(); + assert electionWon() == isElectionQuorum(joinVotes); + if (electionWon()) { + assert getLastPublishedVersion() >= getLastAcceptedVersion(); + } else { + assert getLastPublishedVersion() == 0L; + } + assert electionWon() == false || startedJoinSinceLastReboot; + assert publishVotes.isEmpty() || electionWon(); + } + /** * Pluggable persistence layer for {@link CoordinationState}. * diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java index e7f61efa69054..fb702f706da34 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java @@ -35,11 +35,18 @@ import org.elasticsearch.test.EqualsHashCodeTestUtils; import org.junit.Before; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import static java.util.stream.Collectors.toSet; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; public class CoordinationStateTests extends ESTestCase { @@ -752,6 +759,11 @@ public void testVoteCollection() { }); } + public void testSafety() { + Cluster cluster = new Cluster(randomIntBetween(1, 5)); + cluster.runRandomly(10000); + } + public static CoordinationState createCoordinationState(PersistedState storage, DiscoveryNode localNode) { final Settings initialSettings = Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), localNode.getId()).build(); return new CoordinationState(initialSettings, localNode, storage); @@ -827,4 +839,177 @@ public ClusterState getLastAcceptedState() { return acceptedState; } } + + static class ClusterNode { + + final DiscoveryNode localNode; + final PersistedState persistedState; + CoordinationState state; + + ClusterNode(DiscoveryNode localNode) { + this.localNode = localNode; + persistedState = new InMemoryPersistedState(0L, + clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L)); + state = new CoordinationState(Settings.EMPTY, localNode, persistedState); + } + + void reboot() { + state = new CoordinationState(Settings.EMPTY, localNode, persistedState); + } + + void setInitialState(VotingConfiguration initialConfig, long initialValue) { + final ClusterState.Builder builder = ClusterState.builder(state.getLastAcceptedState()).incrementVersion(); + builder.lastAcceptedConfiguration(initialConfig); + builder.lastCommittedConfiguration(initialConfig); + state.setInitialState(setValue(builder.build(), initialValue)); + } + } + + static class Cluster { + + final List messages; + final List clusterNodes; + final VotingConfiguration initialConfiguration; + final long initialValue; + + Cluster(int numNodes) { + messages = new ArrayList<>(); + + clusterNodes = IntStream.range(0, numNodes) + .mapToObj(i -> new DiscoveryNode("node_" + i, buildNewFakeTransportAddress(), Version.CURRENT)) + .map(ClusterNode::new) + .collect(Collectors.toList()); + + initialConfiguration = randomVotingConfig(); + initialValue = randomLong(); + } + + static class Message { + final DiscoveryNode sourceNode; + final DiscoveryNode targetNode; + final Object payload; + + Message(DiscoveryNode sourceNode, DiscoveryNode targetNode, Object payload) { + this.sourceNode = sourceNode; + this.targetNode = targetNode; + this.payload = payload; + } + } + + void reply(Message m, Object payload) { + messages.add(new Message(m.targetNode, m.sourceNode, payload)); + } + + void broadcast(DiscoveryNode sourceNode, Object payload) { + messages.addAll(clusterNodes.stream().map(cn -> new Message(sourceNode, cn.localNode, payload)).collect(Collectors.toList())); + } + + Optional getNode(DiscoveryNode node) { + return clusterNodes.stream().filter(cn -> cn.localNode.equals(node)).findFirst(); + } + + VotingConfiguration randomVotingConfig() { + return new VotingConfiguration( + randomSubsetOf(randomIntBetween(1, clusterNodes.size()), clusterNodes).stream() + .map(cn -> cn.localNode.getId()).collect(toSet())); + } + + void applyMessage(Message message) { + final Optional maybeNode = getNode(message.targetNode); + if (maybeNode.isPresent() == false) { + throw new CoordinationStateRejectedException("node not available"); + } else { + final Object payload = message.payload; + if (payload instanceof StartJoinRequest) { + reply(message, maybeNode.get().state.handleStartJoin((StartJoinRequest) payload)); + } else if (payload instanceof Join) { + maybeNode.get().state.handleJoin((Join) payload); + } else if (payload instanceof PublishRequest) { + reply(message, maybeNode.get().state.handlePublishRequest((PublishRequest) payload)); + } else if (payload instanceof PublishResponse) { + maybeNode.get().state.handlePublishResponse(message.sourceNode, (PublishResponse) payload) + .ifPresent(ac -> broadcast(message.targetNode, ac)); + } else if (payload instanceof ApplyCommitRequest) { + maybeNode.get().state.handleCommit((ApplyCommitRequest) payload); + } else { + throw new AssertionError("unknown message type"); + } + } + } + + void runRandomly(int iterations) { + final long maxTerm = 4; + long nextTerm = 1; + for (int i = 0; i < iterations; i++) { + try { + if (rarely() && nextTerm < maxTerm) { + final long term = rarely() ? randomLongBetween(0, maxTerm + 1) : nextTerm++; + final StartJoinRequest startJoinRequest = new StartJoinRequest(randomFrom(clusterNodes).localNode, term); + broadcast(startJoinRequest.getSourceNode(), startJoinRequest); + } else if (rarely()) { + randomFrom(clusterNodes).setInitialState(initialConfiguration, initialValue); + } else if (rarely() && rarely()) { + randomFrom(clusterNodes).reboot(); + } else if (rarely()) { + final List masterNodes = clusterNodes.stream().filter(cn -> cn.state.electionWon()) + .collect(Collectors.toList()); + if (masterNodes.isEmpty() == false) { + final ClusterNode clusterNode = randomFrom(masterNodes); + final long term = rarely() ? randomLongBetween(0, maxTerm + 1) : clusterNode.state.getCurrentTerm(); + final long version = rarely() ? randomIntBetween(0, 5) : clusterNode.state.getLastPublishedVersion() + 1; + final VotingConfiguration acceptedConfig = rarely() ? randomVotingConfig() : + clusterNode.state.getLastAcceptedConfiguration(); + final PublishRequest publishRequest = clusterNode.state.handleClientValue( + clusterState(term, version, clusterNode.localNode, clusterNode.state.getLastCommittedConfiguration(), + acceptedConfig, randomLong())); + broadcast(clusterNode.localNode, publishRequest); + } + } else if (messages.isEmpty() == false) { + applyMessage(randomFrom(messages)); + } + + // check node invariants after each iteration + clusterNodes.forEach(cn -> cn.state.invariant()); + } catch (CoordinationStateRejectedException e) { + // ignore + } + } + + // check system invariants + invariant(); + } + + void invariant() { + // one master per term + messages.stream().filter(m -> m.payload instanceof PublishRequest) + .collect(Collectors.groupingBy(m -> ((PublishRequest) m.payload).getAcceptedState().term())) + .forEach((term, publishMessages) -> { + Set mastersForTerm = publishMessages.stream().collect(Collectors.groupingBy(m -> m.sourceNode)).keySet(); + assertThat("Multiple masters " + mastersForTerm + " for term " + term, mastersForTerm, hasSize(1)); + }); + + // unique cluster state per (term, version) pair + messages.stream().filter(m -> m.payload instanceof PublishRequest) + .map(m -> ((PublishRequest) m.payload).getAcceptedState()) + .collect(Collectors.groupingBy(ClusterState::term)) + .forEach((term, clusterStates) -> { + clusterStates.stream().collect(Collectors.groupingBy(ClusterState::version)) + .forEach((version, clusterStates1) -> { + Set clusterStateUUIDsForTermAndVersion = clusterStates1.stream().collect(Collectors.groupingBy( + ClusterState::stateUUID + )).keySet(); + assertThat("Multiple cluster states " + clusterStates1 + " for term " + term + " and version " + version, + clusterStateUUIDsForTermAndVersion, hasSize(1)); + + Set clusterStateValuesForTermAndVersion = clusterStates1.stream().collect(Collectors.groupingBy( + CoordinationStateTests::value + )).keySet(); + + assertThat("Multiple cluster states " + clusterStates1 + " for term " + term + " and version " + version, + clusterStateValuesForTermAndVersion, hasSize(1)); + }); + }); + } + + } }