diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/ElasticsearchNodeCommand.java b/server/src/main/java/org/elasticsearch/cluster/coordination/ElasticsearchNodeCommand.java index 5864244021ee7..1123891c295e3 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/ElasticsearchNodeCommand.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/ElasticsearchNodeCommand.java @@ -43,7 +43,6 @@ import java.util.Arrays; import java.util.EnumSet; import java.util.Map; -import java.util.Objects; public abstract class ElasticsearchNodeCommand extends EnvironmentAwareCommand { private static final Logger logger = LogManager.getLogger(ElasticsearchNodeCommand.class); @@ -131,12 +130,11 @@ public static Tuple loadTermAndClusterState(PersistedCluster protected void processNodePaths(Terminal terminal, OptionSet options, Environment env) throws IOException, UserException { terminal.println(Terminal.Verbosity.VERBOSE, "Obtaining lock for node"); try (NodeEnvironment.NodeLock lock = new NodeEnvironment.NodeLock(logger, env, Files::exists)) { - final Path[] dataPaths = - Arrays.stream(lock.getNodePaths()).filter(Objects::nonNull).map(p -> p.path).toArray(Path[]::new); - if (dataPaths.length == 0) { + final NodeEnvironment.NodePath dataPath = lock.getNodePath(); + if (dataPath == null) { throw new ElasticsearchException(NO_NODE_FOLDER_FOUND_MSG); } - processNodePaths(terminal, dataPaths, options, env); + processNodePaths(terminal, new Path[] { dataPath.path }, options, env); } catch (LockObtainFailedException e) { throw new ElasticsearchException(FAILED_TO_OBTAIN_NODE_LOCK_MSG, e); } diff --git a/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java b/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java index e6dd29f9c7c3f..718fcc8840bce 100644 --- a/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java +++ b/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java @@ -220,8 +220,8 @@ public NodeLock(final Logger logger, } } - public NodePath[] getNodePaths() { - return nodePaths; + public NodePath getNodePath() { + return nodePaths[0]; } @Override @@ -352,67 +352,65 @@ private static boolean upgradeLegacyNodeFolders(Logger logger, Settings settings } // move contents from legacy path to new path - assert nodeLock.getNodePaths().length == legacyNodeLock.getNodePaths().length; try { final List> upgradeActions = new ArrayList<>(); - for (int i = 0; i < legacyNodeLock.getNodePaths().length; i++) { - final NodePath legacyNodePath = legacyNodeLock.getNodePaths()[i]; - final NodePath nodePath = nodeLock.getNodePaths()[i]; - - // determine folders to move and check that there are no extra files/folders - final Set folderNames = new HashSet<>(); - final Set expectedFolderNames = new HashSet<>(Arrays.asList( - - // node state directory, containing MetadataStateFormat-based node metadata as well as cluster state - MetadataStateFormat.STATE_DIR_NAME, - - // indices - INDICES_FOLDER, - - // searchable snapshot cache Lucene index - SNAPSHOT_CACHE_FOLDER - )); - - try (DirectoryStream stream = Files.newDirectoryStream(legacyNodePath.path)) { - for (Path subFolderPath : stream) { - final String fileName = subFolderPath.getFileName().toString(); - if (FileSystemUtils.isDesktopServicesStore(subFolderPath)) { - // ignore - } else if (FileSystemUtils.isAccessibleDirectory(subFolderPath, logger)) { - if (expectedFolderNames.contains(fileName) == false) { - throw new IllegalStateException("unexpected folder encountered during data folder upgrade: " + - subFolderPath); - } - final Path targetSubFolderPath = nodePath.path.resolve(fileName); - if (Files.exists(targetSubFolderPath)) { - throw new IllegalStateException("target folder already exists during data folder upgrade: " + - targetSubFolderPath); - } - folderNames.add(fileName); - } else if (fileName.equals(NODE_LOCK_FILENAME) == false && - fileName.equals(TEMP_FILE_NAME) == false) { - throw new IllegalStateException("unexpected file/folder encountered during data folder upgrade: " + + final NodePath legacyNodePath = legacyNodeLock.getNodePath(); + final NodePath nodePath = nodeLock.getNodePath(); + + // determine folders to move and check that there are no extra files/folders + final Set folderNames = new HashSet<>(); + final Set expectedFolderNames = new HashSet<>(Arrays.asList( + + // node state directory, containing MetadataStateFormat-based node metadata as well as cluster state + MetadataStateFormat.STATE_DIR_NAME, + + // indices + INDICES_FOLDER, + + // searchable snapshot cache Lucene index + SNAPSHOT_CACHE_FOLDER + )); + + try (DirectoryStream stream = Files.newDirectoryStream(legacyNodePath.path)) { + for (Path subFolderPath : stream) { + final String fileName = subFolderPath.getFileName().toString(); + if (FileSystemUtils.isDesktopServicesStore(subFolderPath)) { + // ignore + } else if (FileSystemUtils.isAccessibleDirectory(subFolderPath, logger)) { + if (expectedFolderNames.contains(fileName) == false) { + throw new IllegalStateException("unexpected folder encountered during data folder upgrade: " + subFolderPath); } + final Path targetSubFolderPath = nodePath.path.resolve(fileName); + if (Files.exists(targetSubFolderPath)) { + throw new IllegalStateException("target folder already exists during data folder upgrade: " + + targetSubFolderPath); + } + folderNames.add(fileName); + } else if (fileName.equals(NODE_LOCK_FILENAME) == false && + fileName.equals(TEMP_FILE_NAME) == false) { + throw new IllegalStateException("unexpected file/folder encountered during data folder upgrade: " + + subFolderPath); } } + } - assert Sets.difference(folderNames, expectedFolderNames).isEmpty() : - "expected indices and/or state dir folder but was " + folderNames; + assert Sets.difference(folderNames, expectedFolderNames).isEmpty() : + "expected indices and/or state dir folder but was " + folderNames; + + upgradeActions.add(() -> { + for (String folderName : folderNames) { + final Path sourceSubFolderPath = legacyNodePath.path.resolve(folderName); + final Path targetSubFolderPath = nodePath.path.resolve(folderName); + Files.move(sourceSubFolderPath, targetSubFolderPath, StandardCopyOption.ATOMIC_MOVE); + logger.info("data folder upgrade: moved from [{}] to [{}]", sourceSubFolderPath, targetSubFolderPath); + } + IOUtils.fsync(nodePath.path, true); + }); - upgradeActions.add(() -> { - for (String folderName : folderNames) { - final Path sourceSubFolderPath = legacyNodePath.path.resolve(folderName); - final Path targetSubFolderPath = nodePath.path.resolve(folderName); - Files.move(sourceSubFolderPath, targetSubFolderPath, StandardCopyOption.ATOMIC_MOVE); - logger.info("data folder upgrade: moved from [{}] to [{}]", sourceSubFolderPath, targetSubFolderPath); - } - IOUtils.fsync(nodePath.path, true); - }); - } // now do the actual upgrade. start by upgrading the node metadata file before moving anything, since a downgrade in an // intermediate state would be pretty disastrous - loadNodeMetadata(settings, logger, legacyNodeLock.getNodePaths()); + loadNodeMetadata(settings, logger, legacyNodeLock.getNodePath()); for (CheckedRunnable upgradeAction : upgradeActions) { upgradeAction.run(); } @@ -920,12 +918,12 @@ public String nodeId() { /** * Returns an array of all of the {@link NodePath}s. */ - public NodePath[] nodePaths() { + public NodePath nodePath() { assertEnvIsLocked(); if (nodePaths == null || locks == null) { throw new IllegalStateException("node is not configured to store local location"); } - return nodePaths; + return nodePaths[0]; } /** diff --git a/server/src/main/java/org/elasticsearch/index/shard/ShardPath.java b/server/src/main/java/org/elasticsearch/index/shard/ShardPath.java index c2b44c5e2bc3c..ea8203556ccb2 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/ShardPath.java +++ b/server/src/main/java/org/elasticsearch/index/shard/ShardPath.java @@ -9,20 +9,16 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.util.Strings; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.ShardLock; import org.elasticsearch.index.IndexSettings; import java.io.IOException; -import java.math.BigInteger; -import java.nio.file.FileStore; import java.nio.file.Files; import java.nio.file.Path; -import java.util.Arrays; -import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.function.Consumer; @@ -192,85 +188,13 @@ public static ShardPath selectNewPathForShard(NodeEnvironment env, ShardId shard if (indexSettings.hasCustomDataPath()) { dataPath = env.resolveCustomLocation(indexSettings.customDataPath(), shardId); - statePath = env.nodePaths()[0].resolve(shardId); + statePath = env.nodePath().resolve(shardId); } else { - BigInteger totFreeSpace = BigInteger.ZERO; - for (NodeEnvironment.NodePath nodePath : env.nodePaths()) { - totFreeSpace = totFreeSpace.add(BigInteger.valueOf(nodePath.fileStore.getUsableSpace())); - } - - // TODO: this is a hack!! We should instead keep track of incoming (relocated) shards since we know - // how large they will be once they're done copying, instead of a silly guess for such cases: - - // Very rough heuristic of how much disk space we expect the shard will use over its lifetime, the max of current average - // shard size across the cluster and 5% of the total available free space on this node: - BigInteger estShardSizeInBytes = BigInteger.valueOf(avgShardSizeInBytes).max(totFreeSpace.divide(BigInteger.valueOf(20))); - - // TODO - do we need something more extensible? Yet, this does the job for now... - final NodeEnvironment.NodePath[] paths = env.nodePaths(); - - // If no better path is chosen, use the one with the most space by default - NodeEnvironment.NodePath bestPath = getPathWithMostFreeSpace(env); - - if (paths.length != 1) { - Map pathToShardCount = env.shardCountPerPath(shardId.getIndex()); - - // Compute how much space there is on each path - final Map pathsToSpace = new HashMap<>(paths.length); - for (NodeEnvironment.NodePath nodePath : paths) { - FileStore fileStore = nodePath.fileStore; - BigInteger usableBytes = BigInteger.valueOf(fileStore.getUsableSpace()); - pathsToSpace.put(nodePath, usableBytes); - } - - bestPath = Arrays.stream(paths) - // Filter out paths that have enough space - .filter((path) -> pathsToSpace.get(path).subtract(estShardSizeInBytes).compareTo(BigInteger.ZERO) > 0) - // Sort by the number of shards for this index - .sorted((p1, p2) -> { - int cmp = Long.compare(pathToShardCount.getOrDefault(p1, 0L), - pathToShardCount.getOrDefault(p2, 0L)); - if (cmp == 0) { - // if the number of shards is equal, tie-break with the number of total shards - cmp = Integer.compare(dataPathToShardCount.getOrDefault(p1.path, 0), - dataPathToShardCount.getOrDefault(p2.path, 0)); - if (cmp == 0) { - // if the number of shards is equal, tie-break with the usable bytes - cmp = pathsToSpace.get(p2).compareTo(pathsToSpace.get(p1)); - } - } - return cmp; - }) - // Return the first result - .findFirst() - // Or the existing best path if there aren't any that fit the criteria - .orElse(bestPath); - } - - statePath = bestPath.resolve(shardId); - dataPath = statePath; + dataPath = statePath = env.nodePath().resolve(shardId); } return new ShardPath(indexSettings.hasCustomDataPath(), dataPath, statePath, shardId); } - static NodeEnvironment.NodePath getPathWithMostFreeSpace(NodeEnvironment env) throws IOException { - final NodeEnvironment.NodePath[] paths = env.nodePaths(); - NodeEnvironment.NodePath bestPath = null; - long maxUsableBytes = Long.MIN_VALUE; - for (NodeEnvironment.NodePath nodePath : paths) { - FileStore fileStore = nodePath.fileStore; - long usableBytes = fileStore.getUsableSpace(); // NB usable bytes doesn't account for reserved space (e.g. incoming recoveries) - assert usableBytes >= 0 : "usable bytes must be >= 0, got: " + usableBytes; - - if (bestPath == null || usableBytes > maxUsableBytes) { - // This path has been determined to be "better" based on the usable bytes - maxUsableBytes = usableBytes; - bestPath = nodePath; - } - } - return bestPath; - } - @Override public boolean equals(Object o) { if (this == o) { diff --git a/server/src/main/java/org/elasticsearch/monitor/fs/FsProbe.java b/server/src/main/java/org/elasticsearch/monitor/fs/FsProbe.java index bed40f81c595b..d7ec1e98b8ab0 100644 --- a/server/src/main/java/org/elasticsearch/monitor/fs/FsProbe.java +++ b/server/src/main/java/org/elasticsearch/monitor/fs/FsProbe.java @@ -42,22 +42,17 @@ public FsInfo stats(FsInfo previous) throws IOException { if (nodeEnv.hasNodeFile() == false) { return new FsInfo(System.currentTimeMillis(), null, new FsInfo.Path[0]); } - NodePath[] dataLocations = nodeEnv.nodePaths(); - FsInfo.Path[] paths = new FsInfo.Path[dataLocations.length]; - for (int i = 0; i < dataLocations.length; i++) { - paths[i] = getFSInfo(dataLocations[i]); - } + NodePath dataLocation = nodeEnv.nodePath(); + FsInfo.Path pathInfo = getFSInfo(dataLocation); FsInfo.IoStats ioStats = null; if (Constants.LINUX) { Set> devicesNumbers = new HashSet<>(); - for (int i = 0; i < dataLocations.length; i++) { - if (dataLocations[i].majorDeviceNumber != -1 && dataLocations[i].minorDeviceNumber != -1) { - devicesNumbers.add(Tuple.tuple(dataLocations[i].majorDeviceNumber, dataLocations[i].minorDeviceNumber)); - } + if (dataLocation.majorDeviceNumber != -1 && dataLocation.minorDeviceNumber != -1) { + devicesNumbers.add(Tuple.tuple(dataLocation.majorDeviceNumber, dataLocation.minorDeviceNumber)); } ioStats = ioStats(devicesNumbers, previous); } - return new FsInfo(System.currentTimeMillis(), ioStats, paths); + return new FsInfo(System.currentTimeMillis(), ioStats, new FsInfo.Path[] { pathInfo }); } final FsInfo.IoStats ioStats(final Set> devicesNumbers, final FsInfo previous) { diff --git a/server/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java b/server/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java index 4402e7c175896..31c873f2a6ece 100644 --- a/server/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java +++ b/server/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java @@ -212,7 +212,7 @@ public void testDeleteSafe() throws Exception { SetOnce listener = new SetOnce<>(); env.deleteShardDirectorySafe(new ShardId(index, 1), idxSettings, listener::set); Path deletedPath = listener.get(); - assertThat(deletedPath, equalTo(env.nodePaths()[0].resolve(index).resolve("1"))); + assertThat(deletedPath, equalTo(env.nodePath().resolve(index).resolve("1"))); } path = env.indexPath(index); diff --git a/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java b/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java index b5875faf465cf..b8e79b0049f74 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java @@ -55,9 +55,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import java.util.Arrays; import java.util.Collections; -import java.util.Objects; import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -133,8 +131,8 @@ public void setup() throws IOException { clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(Metadata.builder().put(indexMetadata, false).build()).build(); try (NodeEnvironment.NodeLock lock = new NodeEnvironment.NodeLock(logger, environment, Files::exists)) { - final Path[] dataPaths = Arrays.stream(lock.getNodePaths()).filter(Objects::nonNull).map(p -> p.path).toArray(Path[]::new); - try (PersistedClusterStateService.Writer writer = new PersistedClusterStateService(dataPaths, nodeId, + final NodeEnvironment.NodePath dataPath = lock.getNodePath(); + try (PersistedClusterStateService.Writer writer = new PersistedClusterStateService(new Path[] { dataPath.path }, nodeId, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L).createWriter()) { writer.writeFullStateAndCommit(1L, clusterState); diff --git a/x-pack/plugin/autoscaling/src/internalClusterTest/java/org/elasticsearch/xpack/autoscaling/action/TransportGetAutoscalingCapacityActionIT.java b/x-pack/plugin/autoscaling/src/internalClusterTest/java/org/elasticsearch/xpack/autoscaling/action/TransportGetAutoscalingCapacityActionIT.java index 0e1372aa1d773..31f25e9eaf5a9 100644 --- a/x-pack/plugin/autoscaling/src/internalClusterTest/java/org/elasticsearch/xpack/autoscaling/action/TransportGetAutoscalingCapacityActionIT.java +++ b/x-pack/plugin/autoscaling/src/internalClusterTest/java/org/elasticsearch/xpack/autoscaling/action/TransportGetAutoscalingCapacityActionIT.java @@ -37,7 +37,7 @@ public class TransportGetAutoscalingCapacityActionIT extends AutoscalingIntegTes public void testCurrentCapacity() throws Exception { assertThat(capacity().results().keySet(), Matchers.empty()); long memory = OsProbe.getInstance().getTotalPhysicalMemorySize(); - long storage = internalCluster().getInstance(NodeEnvironment.class).nodePaths()[0].fileStore.getTotalSpace(); + long storage = internalCluster().getInstance(NodeEnvironment.class).nodePath().fileStore.getTotalSpace(); assertThat(memory, greaterThan(0L)); assertThat(storage, greaterThan(0L)); putAutoscalingPolicy("test"); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/PersistentCache.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/PersistentCache.java index 13762838011c2..80c2b111e15f8 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/PersistentCache.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/PersistentCache.java @@ -336,10 +336,8 @@ private static List createWriters(NodeEnvironment nodeEnvironm final List writers = new ArrayList<>(); boolean success = false; try { - final NodeEnvironment.NodePath[] nodePaths = nodeEnvironment.nodePaths(); - for (NodeEnvironment.NodePath nodePath : nodePaths) { - writers.add(createCacheIndexWriter(nodePath)); - } + final NodeEnvironment.NodePath nodePath = nodeEnvironment.nodePath(); + writers.add(createCacheIndexWriter(nodePath)); success = true; } catch (IOException e) { throw new UncheckedIOException("Failed to create persistent cache writers", e); @@ -395,11 +393,10 @@ static CacheIndexWriter createCacheIndexWriter(NodeEnvironment.NodePath nodePath static Map loadDocuments(NodeEnvironment nodeEnvironment) { final Map documents = new HashMap<>(); try { - for (NodeEnvironment.NodePath nodePath : nodeEnvironment.nodePaths()) { - final Path directoryPath = resolveCacheIndexFolder(nodePath); - if (Files.exists(directoryPath)) { - documents.putAll(loadDocuments(directoryPath)); - } + NodeEnvironment.NodePath nodePath = nodeEnvironment.nodePath(); + final Path directoryPath = resolveCacheIndexFolder(nodePath); + if (Files.exists(directoryPath)) { + documents.putAll(loadDocuments(directoryPath)); } } catch (IOException e) { throw new UncheckedIOException("Failed to load existing documents from persistent cache index", e); @@ -449,23 +446,22 @@ public static void cleanUp(Settings settings, NodeEnvironment nodeEnvironment) { throw new IllegalStateException("Cannot clean searchable snapshot caches: node is a data node"); } try { - for (NodeEnvironment.NodePath nodePath : nodeEnvironment.nodePaths()) { - for (String indexUUID : nodeEnvironment.availableIndexFoldersForPath(nodePath)) { - for (ShardId shardId : nodeEnvironment.findAllShardIds(new Index("_unknown_", indexUUID))) { - final Path shardDataPath = nodePath.resolve(shardId); - final ShardPath shardPath = new ShardPath(false, shardDataPath, shardDataPath, shardId); - final Path cacheDir = getShardCachePath(shardPath); - if (Files.isDirectory(cacheDir)) { - logger.debug("deleting searchable snapshot shard cache directory [{}]", cacheDir); - IOUtils.rm(cacheDir); - } + NodeEnvironment.NodePath nodePath = nodeEnvironment.nodePath(); + for (String indexUUID : nodeEnvironment.availableIndexFoldersForPath(nodePath)) { + for (ShardId shardId : nodeEnvironment.findAllShardIds(new Index("_unknown_", indexUUID))) { + final Path shardDataPath = nodePath.resolve(shardId); + final ShardPath shardPath = new ShardPath(false, shardDataPath, shardDataPath, shardId); + final Path cacheDir = getShardCachePath(shardPath); + if (Files.isDirectory(cacheDir)) { + logger.debug("deleting searchable snapshot shard cache directory [{}]", cacheDir); + IOUtils.rm(cacheDir); } } - final Path cacheIndexDir = resolveCacheIndexFolder(nodePath); - if (Files.isDirectory(cacheIndexDir)) { - logger.debug("deleting searchable snapshot lucene directory [{}]", cacheIndexDir); - IOUtils.rm(cacheIndexDir); - } + } + final Path cacheIndexDir = resolveCacheIndexFolder(nodePath); + if (Files.isDirectory(cacheIndexDir)) { + logger.debug("deleting searchable snapshot lucene directory [{}]", cacheIndexDir); + IOUtils.rm(cacheIndexDir); } } catch (IOException e) { throw new UncheckedIOException("Failed to clean up searchable snapshots cache", e); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/PersistentCacheTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/PersistentCacheTests.java index cf5483e95c295..6b82945fd35ba 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/PersistentCacheTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/PersistentCacheTests.java @@ -64,7 +64,7 @@ public class PersistentCacheTests extends AbstractSearchableSnapshotsTestCase { public void testCacheIndexWriter() throws Exception { - final NodeEnvironment.NodePath nodePath = randomFrom(nodeEnvironment.nodePaths()); + final NodeEnvironment.NodePath nodePath = nodeEnvironment.nodePath(); int docId = 0; final Map liveDocs = new HashMap<>();