-
Notifications
You must be signed in to change notification settings - Fork 24.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Promote replica on the highest version node #25277
Changes from 6 commits
3aa3edc
1273e99
879eef0
cb42f65
54edaf8
ef8c79d
27469f5
4da03ea
56dfe80
3ca757b
9d9d4b7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,7 @@ | |
import org.apache.logging.log4j.Logger; | ||
import org.apache.lucene.util.CollectionUtil; | ||
import org.elasticsearch.Assertions; | ||
import org.elasticsearch.Version; | ||
import org.elasticsearch.cluster.ClusterState; | ||
import org.elasticsearch.cluster.metadata.IndexMetaData; | ||
import org.elasticsearch.cluster.metadata.MetaData; | ||
|
@@ -320,14 +321,18 @@ public ShardRouting activePrimary(ShardId shardId) { | |
/** | ||
* Returns one active replica shard for the given shard id or <code>null</code> if | ||
* no active replica is found. | ||
* | ||
* Since replicas could possibly be on nodes with a older version of ES than | ||
* the primary is, this will return replicas on the highest version of ES. | ||
* | ||
*/ | ||
public ShardRouting activeReplica(ShardId shardId) { | ||
for (ShardRouting shardRouting : assignedShards(shardId)) { | ||
if (!shardRouting.primary() && shardRouting.active()) { | ||
return shardRouting; | ||
} | ||
} | ||
return null; | ||
public ShardRouting activeReplicaWithHighestVersion(ShardId shardId) { | ||
return assignedShards(shardId).stream() | ||
.filter(shr -> !shr.primary() && shr.active()) | ||
.filter(shr -> node(shr.currentNodeId()) != null) | ||
.max(Comparator.comparing(shr -> node(shr.currentNodeId()).node(), | ||
Comparator.nullsFirst(Comparator.comparing(DiscoveryNode::getVersion)))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you readd the comment why we need to consider "null" here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Re-added this comment |
||
.orElse(null); | ||
} | ||
|
||
/** | ||
|
@@ -567,7 +572,7 @@ assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId | |
if (failedShard.relocatingNodeId() == null) { | ||
if (failedShard.primary()) { | ||
// promote active replica to primary if active replica exists (only the case for shadow replicas) | ||
ShardRouting activeReplica = activeReplica(failedShard.shardId()); | ||
ShardRouting activeReplica = activeReplicaWithHighestVersion(failedShard.shardId()); | ||
if (activeReplica == null) { | ||
moveToUnassigned(failedShard, unassignedInfo); | ||
} else { | ||
|
@@ -596,7 +601,7 @@ assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId | |
assert failedShard.active(); | ||
if (failedShard.primary()) { | ||
// promote active replica to primary if active replica exists | ||
ShardRouting activeReplica = activeReplica(failedShard.shardId()); | ||
ShardRouting activeReplica = activeReplicaWithHighestVersion(failedShard.shardId()); | ||
if (activeReplica == null) { | ||
moveToUnassigned(failedShard, unassignedInfo); | ||
} else { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,12 +19,14 @@ | |
|
||
package org.elasticsearch.cluster.routing.allocation; | ||
|
||
import com.carrotsearch.hppc.cursors.ObjectCursor; | ||
import org.apache.logging.log4j.Logger; | ||
import org.elasticsearch.Version; | ||
import org.elasticsearch.cluster.ClusterState; | ||
import org.elasticsearch.cluster.ESAllocationTestCase; | ||
import org.elasticsearch.cluster.metadata.IndexMetaData; | ||
import org.elasticsearch.cluster.metadata.MetaData; | ||
import org.elasticsearch.cluster.node.DiscoveryNode; | ||
import org.elasticsearch.cluster.node.DiscoveryNodes; | ||
import org.elasticsearch.cluster.routing.RoutingNodes; | ||
import org.elasticsearch.cluster.routing.RoutingTable; | ||
|
@@ -35,6 +37,7 @@ | |
import org.elasticsearch.common.logging.Loggers; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.index.shard.ShardId; | ||
import org.elasticsearch.test.VersionUtils; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
|
@@ -499,7 +502,7 @@ public void testFailAllReplicasInitializingOnPrimaryFail() { | |
Collections.singletonList(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0))); | ||
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); | ||
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); | ||
ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplica(shardId); | ||
ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId); | ||
|
||
|
||
// fail the primary shard, check replicas get removed as well... | ||
|
@@ -556,4 +559,119 @@ public void testFailAllReplicasInitializingOnPrimaryFailWhileHavingAReplicaToEle | |
ShardRouting newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard(); | ||
assertThat(newPrimaryShard, not(equalTo(primaryShardToFail))); | ||
} | ||
|
||
public void testReplicaOnNewestVersionIsPromoted() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test checks one specific scenario. I think that it can be easily generalized in the way of the |
||
AllocationService allocation = createAllocationService(Settings.builder().build()); | ||
|
||
MetaData metaData = MetaData.builder().put(IndexMetaData.builder("test") | ||
.settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(3)) .build(); | ||
|
||
RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metaData.index("test")).build(); | ||
|
||
ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) | ||
.metaData(metaData).routingTable(initialRoutingTable).build(); | ||
|
||
ShardId shardId = new ShardId(metaData.index("test").getIndex(), 0); | ||
|
||
// add a single node | ||
clusterState = ClusterState.builder(clusterState).nodes( | ||
DiscoveryNodes.builder() | ||
.add(newNode("node1-5.x", Version.V_5_6_0))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you generalize the test to use two arbitrary (but distinct) versions? i.e. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No? Currently the only situation that is valid for a mixed-major-version cluster is 5.6 and 6.0, we don't support mixed clusters of any other versions and 5.6.1 isn't out yet. I'm not sure how randomization would help here, other than triggering some other version-related failures :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this PR does more than just an ordering on 5.6/6.x. It also orders among 6.0 and 6.1 nodes, which is left untested here. Either we restrict the "Promote replica on the highest version node" logic to only order 6.x nodes before 5.6 (and leave 6.0 and 6.1 unordered) or we test that this logic also properly orders 6.0 and 6.1. I agree there is no need to test 5.1 and 6.2. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, I randomized the versions |
||
.build(); | ||
clusterState = ClusterState.builder(clusterState).routingTable(allocation.reroute(clusterState, "reroute").routingTable()).build(); | ||
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); | ||
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(3)); | ||
|
||
// start primary shard | ||
clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); | ||
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1)); | ||
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(3)); | ||
|
||
// add another 5.6 node | ||
clusterState = ClusterState.builder(clusterState).nodes( | ||
DiscoveryNodes.builder(clusterState.nodes()) | ||
.add(newNode("node2-5.x", Version.V_5_6_0))) | ||
.build(); | ||
|
||
// start the shards, should have 1 primary and 1 replica available | ||
clusterState = allocation.reroute(clusterState, "reroute"); | ||
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(1)); | ||
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(1)); | ||
clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); | ||
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); | ||
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(2)); | ||
|
||
clusterState = ClusterState.builder(clusterState).nodes( | ||
DiscoveryNodes.builder(clusterState.nodes()) | ||
.add(newNode("node3-6.x", VersionUtils.randomVersionBetween(random(), Version.V_6_0_0_alpha1, null))) | ||
.add(newNode("node4-6.x", VersionUtils.randomVersionBetween(random(), Version.V_6_0_0_alpha1, null)))) | ||
.build(); | ||
|
||
// start all the replicas | ||
clusterState = allocation.reroute(clusterState, "reroute"); | ||
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); | ||
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(2)); | ||
clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); | ||
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(4)); | ||
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(0)); | ||
|
||
ShardRouting startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId); | ||
logger.info("--> all shards allocated, replica that should be promoted: {}", startedReplica); | ||
|
||
// fail the primary shard again and make sure the correct replica is promoted | ||
ShardRouting primaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard(); | ||
ClusterState newState = allocation.applyFailedShard(clusterState, primaryShardToFail); | ||
assertThat(newState, not(equalTo(clusterState))); | ||
clusterState = newState; | ||
// the primary gets allocated on another node | ||
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(3)); | ||
|
||
ShardRouting newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard(); | ||
assertThat(newPrimaryShard, not(equalTo(primaryShardToFail))); | ||
assertThat(newPrimaryShard.allocationId(), equalTo(startedReplica.allocationId())); | ||
|
||
Version replicaNodeVersion = clusterState.nodes().getDataNodes().get(startedReplica.currentNodeId()).getVersion(); | ||
assertNotNull(replicaNodeVersion); | ||
logger.info("--> shard {} got assigned to node with version {}", startedReplica, replicaNodeVersion); | ||
|
||
for (ObjectCursor<DiscoveryNode> cursor : clusterState.nodes().getDataNodes().values()) { | ||
if ("node1".equals(cursor.value.getId())) { | ||
// Skip the node that the primary was on, it doesn't have a replica so doesn't need a version check | ||
continue; | ||
} | ||
Version nodeVer = cursor.value.getVersion(); | ||
assertTrue("expected node [" + cursor.value.getId() + "] with version " + nodeVer + " to be before " + replicaNodeVersion, | ||
replicaNodeVersion.onOrAfter(nodeVer)); | ||
} | ||
|
||
startedReplica = clusterState.getRoutingNodes().activeReplicaWithHighestVersion(shardId); | ||
logger.info("--> failing primary shard a second time, should select: {}", startedReplica); | ||
|
||
// fail the primary shard again, and ensure the same thing happens | ||
ShardRouting secondPrimaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard(); | ||
newState = allocation.applyFailedShard(clusterState, secondPrimaryShardToFail); | ||
assertThat(newState, not(equalTo(clusterState))); | ||
clusterState = newState; | ||
// the primary gets allocated on another node | ||
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2)); | ||
|
||
newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard(); | ||
assertThat(newPrimaryShard, not(equalTo(secondPrimaryShardToFail))); | ||
assertThat(newPrimaryShard.allocationId(), equalTo(startedReplica.allocationId())); | ||
|
||
replicaNodeVersion = clusterState.nodes().getDataNodes().get(startedReplica.currentNodeId()).getVersion(); | ||
assertNotNull(replicaNodeVersion); | ||
logger.info("--> shard {} got assigned to node with version {}", startedReplica, replicaNodeVersion); | ||
|
||
for (ObjectCursor<DiscoveryNode> cursor : clusterState.nodes().getDataNodes().values()) { | ||
if (primaryShardToFail.currentNodeId().equals(cursor.value.getId()) || | ||
secondPrimaryShardToFail.currentNodeId().equals(cursor.value.getId())) { | ||
// Skip the node that the primary was on, it doesn't have a replica so doesn't need a version check | ||
continue; | ||
} | ||
Version nodeVer = cursor.value.getVersion(); | ||
assertTrue("expected node [" + cursor.value.getId() + "] with version " + nodeVer + " to be before " + replicaNodeVersion, | ||
replicaNodeVersion.onOrAfter(nodeVer)); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ | |
|
||
package org.elasticsearch.indices.cluster; | ||
|
||
import com.carrotsearch.hppc.cursors.ObjectCursor; | ||
import org.apache.logging.log4j.message.ParameterizedMessage; | ||
import org.elasticsearch.Version; | ||
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest; | ||
|
@@ -50,6 +51,7 @@ | |
import org.elasticsearch.index.Index; | ||
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; | ||
import org.elasticsearch.repositories.RepositoriesService; | ||
import org.elasticsearch.test.VersionUtils; | ||
import org.elasticsearch.threadpool.TestThreadPool; | ||
import org.elasticsearch.threadpool.ThreadPool; | ||
import org.elasticsearch.transport.TransportService; | ||
|
@@ -72,6 +74,8 @@ | |
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; | ||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; | ||
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; | ||
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; | ||
import static org.hamcrest.Matchers.equalTo; | ||
import static org.mockito.Mockito.mock; | ||
import static org.mockito.Mockito.when; | ||
|
||
|
@@ -139,6 +143,87 @@ public void testRandomClusterStateUpdates() { | |
logger.info("Final cluster state: {}", state); | ||
} | ||
|
||
public void testRandomClusterPromotesNewestReplica() { | ||
// we have an IndicesClusterStateService per node in the cluster | ||
final Map<DiscoveryNode, IndicesClusterStateService> clusterStateServiceMap = new HashMap<>(); | ||
ClusterState state = randomInitialClusterState(clusterStateServiceMap, MockIndicesService::new); | ||
|
||
// randomly add nodes of mixed versions | ||
logger.info("--> adding random nodes"); | ||
for (int i = 0; i < randomIntBetween(4, 8); i++) { | ||
DiscoveryNodes newNodes = DiscoveryNodes.builder(state.nodes()) | ||
.add(createRandomVersionNode()).build(); | ||
state = ClusterState.builder(state).nodes(newNodes).build(); | ||
state = cluster.reroute(state, new ClusterRerouteRequest()); // always reroute after node leave | ||
updateNodes(state, clusterStateServiceMap, MockIndicesService::new); | ||
} | ||
|
||
// Log the shard versions (for debugging if necessary) | ||
for (ObjectCursor<DiscoveryNode> cursor : state.nodes().getDataNodes().values()) { | ||
Version nodeVer = cursor.value.getVersion(); | ||
logger.info("--> node [{}] has version [{}]", cursor.value.getId(), nodeVer); | ||
} | ||
|
||
// randomly create some indices | ||
logger.info("--> creating some indices"); | ||
for (int i = 0; i < randomIntBetween(2, 5); i++) { | ||
String name = "index_" + randomAlphaOfLength(15).toLowerCase(Locale.ROOT); | ||
Settings.Builder settingsBuilder = Settings.builder() | ||
.put(SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 3)) | ||
.put(SETTING_NUMBER_OF_REPLICAS, randomIntBetween(1, 3)) | ||
.put("index.routing.allocation.total_shards_per_node", 1); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I removed this :) |
||
CreateIndexRequest request = new CreateIndexRequest(name, settingsBuilder.build()).waitForActiveShards(ActiveShardCount.NONE); | ||
state = cluster.createIndex(state, request); | ||
assertTrue(state.metaData().hasIndex(name)); | ||
} | ||
state = cluster.reroute(state, new ClusterRerouteRequest()); | ||
|
||
ClusterState previousState = state; | ||
// apply cluster state to nodes (incl. master) | ||
for (DiscoveryNode node : state.nodes()) { | ||
IndicesClusterStateService indicesClusterStateService = clusterStateServiceMap.get(node); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test does not require IndicesClusterStateService, only the ClusterStateChanges class. All the code in this block can go away, it does not add anything to the test. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it does use the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. randomInitialClusterState is 5 lines. I think we can duplicate that :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, I moved it. |
||
ClusterState localState = adaptClusterStateToLocalNode(state, node); | ||
ClusterState previousLocalState = adaptClusterStateToLocalNode(previousState, node); | ||
final ClusterChangedEvent event = new ClusterChangedEvent("simulating change", localState, previousLocalState); | ||
indicesClusterStateService.applyClusterState(event); | ||
|
||
// check that cluster state has been properly applied to node | ||
assertClusterStateMatchesNodeState(localState, indicesClusterStateService); | ||
} | ||
|
||
logger.info("--> starting shards"); | ||
state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING));; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. extra semicolon |
||
state = cluster.reroute(state, new ClusterRerouteRequest()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. reroute happens as part of applyStartedShards in the line above |
||
logger.info("--> starting replicas"); | ||
state = cluster.applyStartedShards(state, state.getRoutingNodes().shardsWithState(INITIALIZING));; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there is no guarantee that all replicas are started (as we have throttling). It's good to test the situation where not all replicas are started though, so maybe we can call applyStartedShards a random number of times. |
||
state = cluster.reroute(state, new ClusterRerouteRequest()); | ||
|
||
logger.info("--> state before failing shards: {}", state); | ||
for (int i = 0; i < randomIntBetween(5, 10); i++) { | ||
for (ShardRouting shardRouting : state.getRoutingNodes().shardsWithState(STARTED)) { | ||
if (shardRouting.primary() && randomBoolean()) { | ||
ShardRouting replicaToBePromoted = state.getRoutingNodes() | ||
.activeReplicaWithHighestVersion(shardRouting.shardId()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you're testing the method
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I changed the test to verify candidates using without the activeReplicaWithHighestVersion method |
||
if (replicaToBePromoted != null) { | ||
Version replicaNodeVersion = state.nodes().getDataNodes() | ||
.get(replicaToBePromoted.currentNodeId()).getVersion(); | ||
List<FailedShard> shardsToFail = new ArrayList<>(); | ||
logger.info("--> found replica that should be promoted: {}", replicaToBePromoted); | ||
logger.info("--> failing shard {}", shardRouting); | ||
shardsToFail.add(new FailedShard(shardRouting, "failed primary", new Exception())); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you're testing only one failure at a time. |
||
state = cluster.applyFailedShards(state, shardsToFail); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. an alternative to explicit shard failing is to remove nodes where the shards are allocated (i.e. when a node disconnects from the cluster). |
||
ShardRouting newPrimary = state.routingTable().index(shardRouting.index()) | ||
.shard(shardRouting.id()).primaryShard(); | ||
|
||
assertThat(newPrimary.allocationId().getId(), | ||
equalTo(replicaToBePromoted.allocationId().getId())); | ||
} | ||
} | ||
state = cluster.reroute(state, new ClusterRerouteRequest()); | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* This test ensures that when a node joins a brand new cluster (different cluster UUID), | ||
* different from the cluster it was previously a part of, the in-memory index data structures | ||
|
@@ -388,6 +473,16 @@ protected DiscoveryNode createNode(DiscoveryNode.Role... mustHaveRoles) { | |
Version.CURRENT); | ||
} | ||
|
||
protected DiscoveryNode createRandomVersionNode(DiscoveryNode.Role... mustHaveRoles) { | ||
Set<DiscoveryNode.Role> roles = new HashSet<>(randomSubsetOf(Sets.newHashSet(DiscoveryNode.Role.values()))); | ||
for (DiscoveryNode.Role mustHaveRole : mustHaveRoles) { | ||
roles.add(mustHaveRole); | ||
} | ||
final String id = String.format(Locale.ROOT, "node_%03d", nodeIdGenerator.incrementAndGet()); | ||
return new DiscoveryNode(id, id, buildNewFakeTransportAddress(), Collections.emptyMap(), roles, | ||
VersionUtils.randomVersionBetween(random(), Version.V_5_6_0, null)); | ||
} | ||
|
||
private static ClusterState adaptClusterStateToLocalNode(ClusterState state, DiscoveryNode node) { | ||
return ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(node.getId())).build(); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused import?