Skip to content

Commit

Permalink
Add linearizability checker for coordination layer (#36943)
Browse files Browse the repository at this point in the history
Checks that the core coordination algorithm implemented as part of Zen2 (#32006) supports
linearizable semantics. This commit adds a linearizability checker based on the Wing and Gong
graph search algorithm with support for compositional checking and activates these checks for all
CoordinatorTests.
  • Loading branch information
ywelsch committed Feb 26, 2019
1 parent b40628f commit d42f422
Show file tree
Hide file tree
Showing 3 changed files with 806 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.NodeConnectionsService;
Expand All @@ -37,6 +38,8 @@
import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState;
import org.elasticsearch.cluster.coordination.Coordinator.Mode;
import org.elasticsearch.cluster.coordination.CoordinatorTests.Cluster.ClusterNode;
import org.elasticsearch.cluster.coordination.LinearizabilityChecker.History;
import org.elasticsearch.cluster.coordination.LinearizabilityChecker.SequentialSpec;
import org.elasticsearch.cluster.metadata.Manifest;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand All @@ -47,6 +50,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
Expand All @@ -60,8 +64,8 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.discovery.zen.PublishClusterStateStats;
import org.elasticsearch.discovery.SeedHostsProvider.HostsResolver;
import org.elasticsearch.discovery.zen.PublishClusterStateStats;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.MetaStateService;
import org.elasticsearch.gateway.MockGatewayMetaState;
Expand Down Expand Up @@ -103,8 +107,6 @@
import static java.util.Collections.emptySet;
import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.BOOTSTRAP_PLACEHOLDER_PREFIX;
import static org.elasticsearch.cluster.coordination.CoordinationStateTests.clusterState;
import static org.elasticsearch.cluster.coordination.CoordinationStateTests.setValue;
import static org.elasticsearch.cluster.coordination.CoordinationStateTests.value;
import static org.elasticsearch.cluster.coordination.Coordinator.Mode.CANDIDATE;
import static org.elasticsearch.cluster.coordination.Coordinator.Mode.FOLLOWER;
import static org.elasticsearch.cluster.coordination.Coordinator.Mode.LEADER;
Expand All @@ -119,11 +121,11 @@
import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_INTERVAL_SETTING;
import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING;
import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING;
import static org.elasticsearch.cluster.coordination.Reconfigurator.CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION;
import static org.elasticsearch.cluster.coordination.NoMasterBlockService.NO_MASTER_BLOCK_ALL;
import static org.elasticsearch.cluster.coordination.NoMasterBlockService.NO_MASTER_BLOCK_ID;
import static org.elasticsearch.cluster.coordination.NoMasterBlockService.NO_MASTER_BLOCK_SETTING;
import static org.elasticsearch.cluster.coordination.NoMasterBlockService.NO_MASTER_BLOCK_WRITES;
import static org.elasticsearch.cluster.coordination.Reconfigurator.CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION;
import static org.elasticsearch.discovery.PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
Expand Down Expand Up @@ -945,7 +947,7 @@ private void testAppliesNoMasterBlock(String noMasterBlockSetting, ClusterBlock
final Builder settingsBuilder = Settings.builder().put(cs.metaData().persistentSettings());
settingsBuilder.put(NO_MASTER_BLOCK_SETTING.getKey(), noMasterBlockSetting);
return ClusterState.builder(cs).metaData(MetaData.builder(cs.metaData()).persistentSettings(settingsBuilder.build())).build();
});
}, (source, e) -> {});
cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "committing setting update");

leader.disconnect();
Expand Down Expand Up @@ -1142,6 +1144,8 @@ class Cluster {
private final Set<String> disconnectedNodes = new HashSet<>();
private final Set<String> blackholedNodes = new HashSet<>();
private final Map<Long, ClusterState> committedStatesByVersion = new HashMap<>();
private final LinearizabilityChecker linearizabilityChecker = new LinearizabilityChecker();
private final History history = new History();

private final Function<DiscoveryNode, MockPersistedState> defaultPersistedStateSupplier = MockPersistedState::new;

Expand Down Expand Up @@ -1221,6 +1225,7 @@ void runRandomly(boolean allowReboots) {
cleanupActions.add(() -> disruptStorage = false);

final int randomSteps = scaledRandomIntBetween(10, 10000);
final int keyRange = randomSteps / 50; // for randomized writes and reads
logger.info("--> start of safety phase of at least [{}] steps", randomSteps);

deterministicTaskQueue.setExecutionDelayVariabilityMillis(EXTREME_DELAY_VARIABILITY);
Expand All @@ -1239,13 +1244,22 @@ void runRandomly(boolean allowReboots) {
}

try {
if (rarely()) {
if (finishTime == -1 && randomBoolean() && randomBoolean() && randomBoolean()) {
final ClusterNode clusterNode = getAnyNodePreferringLeaders();
final int key = randomIntBetween(0, keyRange);
final int newValue = randomInt();
clusterNode.onNode(() -> {
logger.debug("----> [runRandomly {}] proposing new value [{}] to [{}]",
thisStep, newValue, clusterNode.getId());
clusterNode.submitValue(newValue);
clusterNode.submitValue(key, newValue);
}).run();
} else if (finishTime == -1 && randomBoolean() && randomBoolean() && randomBoolean()) {
final ClusterNode clusterNode = getAnyNodePreferringLeaders();
final int key = randomIntBetween(0, keyRange);
clusterNode.onNode(() -> {
logger.debug("----> [runRandomly {}] reading value from [{}]",
thisStep, clusterNode.getId());
clusterNode.readValue(key);
}).run();
} else if (rarely()) {
final ClusterNode clusterNode = getAnyNodePreferringLeaders();
Expand Down Expand Up @@ -1426,6 +1440,10 @@ void stabilise(long stabilisationDurationMillis) {
lastAcceptedState.getLastCommittedConfiguration(), equalTo(lastAcceptedState.getLastAcceptedConfiguration()));
assertThat("current configuration is already optimal",
leader.improveConfiguration(lastAcceptedState), sameInstance(lastAcceptedState));

logger.info("checking linearizability of history with size {}: {}", history.size(), history);
assertTrue("history not linearizable: " + history, linearizabilityChecker.isLinearizable(spec, history, i -> null));
logger.info("linearizability check completed");
}

void bootstrapIfNecessary() {
Expand Down Expand Up @@ -1802,14 +1820,55 @@ void submitSetAutoShrinkVotingConfiguration(final boolean autoShrinkVotingConfig
.put(CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION.getKey(), autoShrinkVotingConfiguration)
.build())
.build())
.build());
.build(), (source, e) -> {});
}

AckCollector submitValue(final long value) {
return submitUpdateTask("new value [" + value + "]", cs -> setValue(cs, value));
return submitValue(0, value);
}

AckCollector submitUpdateTask(String source, UnaryOperator<ClusterState> clusterStateUpdate) {
AckCollector submitValue(final int key, final long value) {
final int eventId = history.invoke(new Tuple<>(key, value));
return submitUpdateTask("new value [" + value + "]", cs -> setValue(cs, key, value), new ClusterStateTaskListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
history.respond(eventId, value(oldState, key));
}

@Override
public void onNoLongerMaster(String source) {
// in this case, we know for sure that event was not processed by the system and will not change history
// remove event to help avoid bloated history and state space explosion in linearizability checker
history.remove(eventId);
}

@Override
public void onFailure(String source, Exception e) {
// do not remove event from history, the write might still take place
// instead, complete history when checking for linearizability
}
});
}

void readValue(int key) {
final int eventId = history.invoke(new Tuple<>(key, null));
submitUpdateTask("read value", cs -> ClusterState.builder(cs).build(), new ClusterStateTaskListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
history.respond(eventId, value(newState, key));
}

@Override
public void onFailure(String source, Exception e) {
// reads do not change state
// remove event to help avoid bloated history and state space explosion in linearizability checker
history.remove(eventId);
}
});
}

AckCollector submitUpdateTask(String source, UnaryOperator<ClusterState> clusterStateUpdate,
ClusterStateTaskListener taskListener) {
final AckCollector ackCollector = new AckCollector();
onNode(() -> {
logger.trace("[{}] submitUpdateTask: enqueueing [{}]", localNode.getId(), source);
Expand All @@ -1826,15 +1885,23 @@ public ClusterState execute(ClusterState currentState) {
@Override
public void onFailure(String source, Exception e) {
logger.debug(() -> new ParameterizedMessage("failed to publish: [{}]", source), e);
taskListener.onFailure(source, e);
}

@Override
public void onNoLongerMaster(String source) {
logger.trace("no longer master: [{}]", source);
taskListener.onNoLongerMaster(source);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
updateCommittedStates();
ClusterState state = committedStatesByVersion.get(newState.version());
assertNotNull("State not committed : " + newState.toString(), state);
assertEquals(value(state), value(newState));
assertStateEquals(state, newState);
logger.trace("successfully published: [{}]", newState);
taskListener.clusterStateProcessed(source, oldState, newState);
}
});
}).run();
Expand Down Expand Up @@ -2068,4 +2135,85 @@ enum ClusterStateApplyResponse {
HANG,
}

public ClusterState setValue(ClusterState clusterState, int key, long value) {
return ClusterState.builder(clusterState).metaData(
MetaData.builder(clusterState.metaData())
.persistentSettings(Settings.builder()
.put(clusterState.metaData().persistentSettings())
.put("value_" + key, value)
.build())
.build())
.build();
}

public long value(ClusterState clusterState) {
return value(clusterState, 0);
}

public long value(ClusterState clusterState, int key) {
return clusterState.metaData().persistentSettings().getAsLong("value_" + key, 0L);
}

public void assertStateEquals(ClusterState clusterState1, ClusterState clusterState2) {
assertEquals(clusterState1.version(), clusterState2.version());
assertEquals(clusterState1.term(), clusterState2.term());
assertEquals(keySet(clusterState1), keySet(clusterState2));
for (int key : keySet(clusterState1)) {
assertEquals(value(clusterState1, key), value(clusterState2, key));
}
}

public Set<Integer> keySet(ClusterState clusterState) {
return clusterState.metaData().persistentSettings().keySet().stream()
.filter(s -> s.startsWith("value_")).map(s -> Integer.valueOf(s.substring("value_".length()))).collect(Collectors.toSet());
}

/**
* Simple register model. Writes are modeled by providing an integer input. Reads are modeled by providing null as input.
* Responses that time out are modeled by returning null. Successful writes return the previous value of the register.
*/
private final SequentialSpec spec = new LinearizabilityChecker.KeyedSpec() {
@Override
public Object getKey(Object value) {
return ((Tuple) value).v1();
}

@Override
public Object getValue(Object value) {
return ((Tuple) value).v2();
}

@Override
public Object initialState() {
return 0L;
}

@Override
public Optional<Object> nextState(Object currentState, Object input, Object output) {
// null input is read, non-null is write
if (input == null) {
// history is completed with null, simulating timeout, which assumes that read went through
if (output == null || currentState.equals(output)) {
return Optional.of(currentState);
}
return Optional.empty();
} else {
if (output == null || currentState.equals(output)) {
// history is completed with null, simulating timeout, which assumes that write went through
return Optional.of(input);
}
return Optional.empty();
}
}
};

public void testRegisterSpecConsistency() {
assertThat(spec.initialState(), equalTo(0L));
assertThat(spec.nextState(7, 42, 7), equalTo(Optional.of(42))); // successful write 42 returns previous value 7
assertThat(spec.nextState(7, 42, null), equalTo(Optional.of(42))); // write 42 times out
assertThat(spec.nextState(7, null, 7), equalTo(Optional.of(7))); // successful read
assertThat(spec.nextState(7, null, null), equalTo(Optional.of(7))); // read times out
assertThat(spec.nextState(7, null, 42), equalTo(Optional.empty()));
}

}
Loading

0 comments on commit d42f422

Please sign in to comment.