Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Fail when using multiple data paths (#72184)" #79116

Merged
merged 10 commits into from
Oct 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/
package org.elasticsearch.cluster.routing.allocation.decider;

import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterInfoServiceUtils;
Expand All @@ -24,6 +25,8 @@
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;

import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -44,6 +47,8 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class MockDiskUsagesIT extends ESIntegTestCase {
Expand Down Expand Up @@ -315,6 +320,83 @@ public void testDoesNotExceedLowWatermarkWhenRebalancing() throws Exception {
assertThat("node2 has 1 shard", getShardCountByNodeId().get(nodeIds.get(2)), equalTo(1));
}

public void testMovesShardsOffSpecificDataPathAboveWatermark() throws Exception {

// start one node with two data paths
final Path pathOverWatermark = createTempDir();
final Settings.Builder twoPathSettings = Settings.builder();
if (randomBoolean()) {
twoPathSettings.putList(Environment.PATH_DATA_SETTING.getKey(), createTempDir().toString(), pathOverWatermark.toString());
} else {
twoPathSettings.putList(Environment.PATH_DATA_SETTING.getKey(), pathOverWatermark.toString(), createTempDir().toString());
}
internalCluster().startNode(twoPathSettings);
final String nodeWithTwoPaths = client().admin().cluster().prepareNodesInfo().get().getNodes().get(0).getNode().getId();

// other two nodes have one data path each
internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir()));
internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir()));

final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService();

// prevent any effects from in-flight recoveries, since we are only simulating a 100-byte disk
clusterInfoService.setShardSizeFunctionAndRefresh(shardRouting -> 0L);

// start with all paths below the watermark
clusterInfoService.setDiskUsageFunctionAndRefresh((discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100, between(10, 100)));

assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
.put(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "90%")
.put(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "90%")
.put(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "100%")));

final List<String> nodeIds = StreamSupport.stream(client().admin().cluster().prepareState().get().getState()
.getRoutingNodes().spliterator(), false).map(RoutingNode::nodeId).collect(Collectors.toList());

assertAcked(prepareCreate("test").setSettings(Settings.builder().put("number_of_shards", 6).put("number_of_replicas", 0)));

ensureGreen("test");

{
final Map<String, Integer> shardCountByNodeId = getShardCountByNodeId();
assertThat("node0 has 2 shards", shardCountByNodeId.get(nodeIds.get(0)), equalTo(2));
assertThat("node1 has 2 shards", shardCountByNodeId.get(nodeIds.get(1)), equalTo(2));
assertThat("node2 has 2 shards", shardCountByNodeId.get(nodeIds.get(2)), equalTo(2));
}

final long shardsOnGoodPath = Arrays.stream(client().admin().indices().prepareStats("test").get().getShards())
.filter(shardStats -> shardStats.getShardRouting().currentNodeId().equals(nodeWithTwoPaths)
&& shardStats.getDataPath().startsWith(pathOverWatermark.toString()) == false).count();
logger.info("--> shards on good path: [{}]", shardsOnGoodPath);

// disable rebalancing, or else we might move shards back onto the over-full path since we're not faking that
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
.put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE)));

// one of the paths on node0 suddenly exceeds the high watermark
clusterInfoService.setDiskUsageFunctionAndRefresh((discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100L,
fsInfoPath.getPath().startsWith(pathOverWatermark.toString()) ? between(0, 9) : between(10, 100)));

logger.info("--> waiting for shards to relocate off path [{}]", pathOverWatermark);

assertBusy(() -> {
for (final ShardStats shardStats : client().admin().indices().prepareStats("test").get().getShards()) {
assertThat(shardStats.getDataPath(), not(startsWith(pathOverWatermark.toString())));
}
});

ensureGreen("test");

for (final ShardStats shardStats : client().admin().indices().prepareStats("test").get().getShards()) {
assertThat(shardStats.getDataPath(), not(startsWith(pathOverWatermark.toString())));
}

assertThat("should not have moved any shards off of the path that wasn't too full",
Arrays.stream(client().admin().indices().prepareStats("test").get().getShards())
.filter(shardStats -> shardStats.getShardRouting().currentNodeId().equals(nodeWithTwoPaths)
&& shardStats.getDataPath().startsWith(pathOverWatermark.toString()) == false).count(), equalTo(shardsOnGoodPath));
}

private Map<String, Integer> getShardCountByNodeId() {
final Map<String, Integer> shardCountByNodeId = new HashMap<>();
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -205,4 +206,38 @@ public void testUpgradeDataFolder() throws IOException, InterruptedException {
ensureYellow("test");
assertHitCount(client().prepareSearch().setQuery(matchAllQuery()).get(), 1L);
}

public void testFailsToStartOnDataPathsFromMultipleNodes() throws IOException {
final List<String> nodes = internalCluster().startNodes(2);
ensureStableCluster(2);

final List<String> node0DataPaths = Environment.PATH_DATA_SETTING.get(internalCluster().dataPathSettings(nodes.get(0)));
final List<String> node1DataPaths = Environment.PATH_DATA_SETTING.get(internalCluster().dataPathSettings(nodes.get(1)));

final List<String> allDataPaths = new ArrayList<>(node0DataPaths);
allDataPaths.addAll(node1DataPaths);

internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodes.get(1)));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodes.get(0)));

IllegalStateException illegalStateException = expectThrows(IllegalStateException.class,
() -> PersistedClusterStateService.nodeMetadata(allDataPaths.stream().map(PathUtils::get).toArray(Path[]::new)));

assertThat(illegalStateException.getMessage(), containsString("unexpected node ID in metadata"));

illegalStateException = expectThrows(IllegalStateException.class,
() -> internalCluster().startNode(Settings.builder().putList(Environment.PATH_DATA_SETTING.getKey(), allDataPaths)));

assertThat(illegalStateException.getMessage(), containsString("unexpected node ID in metadata"));

final List<String> node0DataPathsPlusOne = new ArrayList<>(node0DataPaths);
node0DataPathsPlusOne.add(createTempDir().toString());
internalCluster().startNode(Settings.builder().putList(Environment.PATH_DATA_SETTING.getKey(), node0DataPathsPlusOne));

final List<String> node1DataPathsPlusOne = new ArrayList<>(node1DataPaths);
node1DataPathsPlusOne.add(createTempDir().toString());
internalCluster().startNode(Settings.builder().putList(Environment.PATH_DATA_SETTING.getKey(), node1DataPathsPlusOne));

ensureStableCluster(2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.elasticsearch.env.NodeEnvironment.INDICES_FOLDER;
import static org.elasticsearch.gateway.MetadataStateFormat.STATE_DIR_NAME;
Expand Down Expand Up @@ -240,8 +241,7 @@ public void testListenersInvokedWhenIndexHasLeftOverShard() throws Exception {
.build()
);

// TODO: decide if multiple leftovers should/can be tested without MDP
final Index[] leftovers = new Index[1];
final Index[] leftovers = new Index[between(1, 3)];
logger.debug("--> creating [{}] leftover indices on data node [{}]", leftovers.length, dataNode);
for (int i = 0; i < leftovers.length; i++) {
final String indexName = "index-" + i;
Expand Down Expand Up @@ -275,19 +275,22 @@ public void testListenersInvokedWhenIndexHasLeftOverShard() throws Exception {
final Index index = internalCluster().clusterService(masterNode).state().metadata().index(indexName).getIndex();
logger.debug("--> index [{}] created", index);

final Path dataPath = createTempDir();
final Path shardPath = dataPath.resolve(INDICES_FOLDER).resolve(index.getUUID()).resolve("0");
Files.createDirectories(shardPath);
final List<Path> dataPaths = new ArrayList<>();
for (int i = 0; i < leftovers.length; i++) {
final Path dataPath = createTempDir();
dataPaths.add(dataPath);
final Path shardPath = dataPath.resolve(INDICES_FOLDER).resolve(index.getUUID()).resolve("0");
Files.createDirectories(shardPath);
final Path leftoverPath = dataDirWithLeftOverShards.resolve(INDICES_FOLDER).resolve(leftovers[i].getUUID()).resolve("0");
Files.move(leftoverPath.resolve(STATE_DIR_NAME), shardPath.resolve(STATE_DIR_NAME));
Files.move(leftoverPath.resolve(INDEX_FOLDER_NAME), shardPath.resolve(INDEX_FOLDER_NAME));
}

logger.debug("--> starting another data node with data path [{}]", dataPath);
logger.debug("--> starting another data node with data paths [{}]", dataPaths);
dataNode = internalCluster().startDataOnlyNode(
Settings.builder()
.put(Environment.PATH_DATA_SETTING.getKey(), dataPath.toAbsolutePath().toString())
.putList(Environment.PATH_DATA_SETTING.getKey(),
dataPaths.stream().map(p -> p.toAbsolutePath().toString()).collect(Collectors.toList()))
.putNull(Environment.PATH_SHARED_DATA_SETTING.getKey())
.build());
ensureStableCluster(1 + 1, masterNode);
Expand Down
7 changes: 1 addition & 6 deletions server/src/main/java/org/elasticsearch/env/Environment.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,7 @@ public Environment(final Settings settings, final Path configPath) {

final Settings.Builder finalSettings = Settings.builder().put(settings);
if (PATH_DATA_SETTING.exists(settings)) {
if (dataFiles.length == 1) {
finalSettings.put(PATH_DATA_SETTING.getKey(), dataFiles[0].toString());
} else {
finalSettings.putList(PATH_DATA_SETTING.getKey(),
Arrays.stream(dataFiles).map(Path::toString).collect(Collectors.toList()));
}
finalSettings.putList(PATH_DATA_SETTING.getKey(), Arrays.stream(dataFiles).map(Path::toString).collect(Collectors.toList()));
}
finalSettings.put(PATH_HOME_SETTING.getKey(), homeFile);
finalSettings.put(PATH_LOGS_SETTING.getKey(), logsFile.toString());
Expand Down
12 changes: 9 additions & 3 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,15 @@ protected Node(final Environment initialEnvironment,
}

if (initialEnvironment.dataFiles().length > 1) {
throw new IllegalArgumentException("Multiple [path.data] values found. Specify a single data path.");
} else if (Environment.dataPathUsesList(tmpSettings)) {
throw new IllegalArgumentException("[path.data] is a list. Specify as a string value.");
// NOTE: we use initialEnvironment here, but assertEquivalent below ensures the data paths do not change
deprecationLogger.critical(DeprecationCategory.SETTINGS, "multiple-data-paths",
"Configuring multiple [path.data] paths is deprecated. Use RAID or other system level features for utilizing " +
"multiple disks. This feature will be removed in a future release.");
}
if (Environment.dataPathUsesList(tmpSettings)) {
// already checked for multiple values above, so if this is a list it is a single valued list
deprecationLogger.critical(DeprecationCategory.SETTINGS, "multiple-data-paths-list",
"Configuring [path.data] with a list is deprecated. Instead specify as a string value.");
}

if (logger.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.is;

public class ShardPathTests extends ESTestCase {
Expand Down Expand Up @@ -237,9 +237,8 @@ public void testDeleteLeftoverShardDirs() throws IOException {
}
ShardPath.deleteLeftoverShardDirectory(logger, env, lock, idxSettings, shardPaths -> {
List<Path> envPathList = Arrays.asList(envPaths);
assertEquals(envPaths.length, shardPaths.length);
for (Path path : shardPaths) {
assertThat(envPathList, contains(path));
assertThat(envPathList, hasItem(path));
}
});
for (Path path : envPaths) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public final void startClusters() throws Exception {
public List<String> filteredWarnings() {
return Stream.concat(super.filteredWarnings().stream(),
List.of("Configuring multiple [path.data] paths is deprecated. Use RAID or other system level features for utilizing " +
"multiple disks. This feature will be removed in 8.0.").stream()).collect(Collectors.toList());
"multiple disks. This feature will be removed in a future release.").stream()).collect(Collectors.toList());
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1840,7 +1840,7 @@ protected TestCluster buildTestCluster(Scope scope, long seed) throws IOExceptio
return new InternalTestCluster(seed, createTempDir(), supportsDedicatedMasters, getAutoManageMasterNodes(),
minNumDataNodes, maxNumDataNodes,
InternalTestCluster.clusterName(scope.name(), seed) + "-cluster", nodeConfigurationSource, getNumClientNodes(),
nodePrefix, mockPlugins, getClientWrapper(), forbidPrivateIndexSettings());
nodePrefix, mockPlugins, getClientWrapper(), forbidPrivateIndexSettings(), forceSingleDataPath());
}

private NodeConfigurationSource getNodeConfigSource() {
Expand Down Expand Up @@ -2149,6 +2149,13 @@ protected boolean forbidPrivateIndexSettings() {
return true;
}

/**
* Override to return true in tests that cannot handle multiple data paths.
*/
protected boolean forceSingleDataPath() {
return false;
}

/**
* Returns an instance of {@link RestClient} pointing to the current test cluster.
* Creates a new client if the method is invoked for the first time in the context of the current test scope.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
import org.apache.lucene.util.TestRuleMarkFailure;
import org.apache.lucene.util.TestUtil;
import org.apache.lucene.util.TimeUnits;
import org.elasticsearch.Version;
import org.elasticsearch.bootstrap.BootstrapForTesting;
Expand Down Expand Up @@ -429,12 +430,15 @@ public void ensureNoWarnings() {
}

protected List<String> filteredWarnings() {
List<String> filtered = new ArrayList<>();
filtered.add("Configuring multiple [path.data] paths is deprecated. Use RAID or other system level features for utilizing" +
" multiple disks. This feature will be removed in a future release.");
filtered.add("Configuring [path.data] with a list is deprecated. Instead specify as a string value");
filtered.add("setting [path.shared_data] is deprecated and will be removed in a future release");
if (JvmInfo.jvmInfo().getBundledJdk() == false) {
return List.of("setting [path.shared_data] is deprecated and will be removed in a future release",
"no-jdk distributions that do not bundle a JDK are deprecated and will be removed in a future release");
} else {
return List.of("setting [path.shared_data] is deprecated and will be removed in a future release");
filtered.add("no-jdk distributions that do not bundle a JDK are deprecated and will be removed in a future release");
}
return filtered;
}

/**
Expand Down Expand Up @@ -1050,7 +1054,12 @@ public Path getDataPath(String relativePath) {

/** Returns a random number of temporary paths. */
public String[] tmpPaths() {
return new String[] { createTempDir().toAbsolutePath().toString() };
final int numPaths = TestUtil.nextInt(random(), 1, 3);
final String[] absPaths = new String[numPaths];
for (int i = 0; i < numPaths; i++) {
absPaths[i] = createTempDir().toAbsolutePath().toString();
}
return absPaths;
}

public NodeEnvironment newNodeEnvironment() throws IOException {
Expand All @@ -1061,7 +1070,7 @@ public Settings buildEnvSettings(Settings settings) {
return Settings.builder()
.put(settings)
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toAbsolutePath())
.put(Environment.PATH_DATA_SETTING.getKey(), createTempDir().toAbsolutePath()).build();
.putList(Environment.PATH_DATA_SETTING.getKey(), tmpPaths()).build();
}

public NodeEnvironment newNodeEnvironment(Settings settings) throws IOException {
Expand Down
Loading