Skip to content

Commit

Permalink
Make nodePaths() singular (elastic#72514)
Browse files Browse the repository at this point in the history
This commit converts the nodePaths() methods in NodeEnvironment to be
singular.

relates elastic#71205
  • Loading branch information
rjernst authored May 2, 2021
1 parent 6b116df commit 2dc796a
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 180 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import java.util.Arrays;
import java.util.EnumSet;
import java.util.Map;
import java.util.Objects;

public abstract class ElasticsearchNodeCommand extends EnvironmentAwareCommand {
private static final Logger logger = LogManager.getLogger(ElasticsearchNodeCommand.class);
Expand Down Expand Up @@ -131,12 +130,11 @@ public static Tuple<Long, ClusterState> loadTermAndClusterState(PersistedCluster
protected void processNodePaths(Terminal terminal, OptionSet options, Environment env) throws IOException, UserException {
terminal.println(Terminal.Verbosity.VERBOSE, "Obtaining lock for node");
try (NodeEnvironment.NodeLock lock = new NodeEnvironment.NodeLock(logger, env, Files::exists)) {
final Path[] dataPaths =
Arrays.stream(lock.getNodePaths()).filter(Objects::nonNull).map(p -> p.path).toArray(Path[]::new);
if (dataPaths.length == 0) {
final NodeEnvironment.NodePath dataPath = lock.getNodePath();
if (dataPath == null) {
throw new ElasticsearchException(NO_NODE_FOLDER_FOUND_MSG);
}
processNodePaths(terminal, dataPaths, options, env);
processNodePaths(terminal, new Path[] { dataPath.path }, options, env);
} catch (LockObtainFailedException e) {
throw new ElasticsearchException(FAILED_TO_OBTAIN_NODE_LOCK_MSG, e);
}
Expand Down
108 changes: 53 additions & 55 deletions server/src/main/java/org/elasticsearch/env/NodeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,8 @@ public NodeLock(final Logger logger,
}
}

public NodePath[] getNodePaths() {
return nodePaths;
public NodePath getNodePath() {
return nodePaths[0];
}

@Override
Expand Down Expand Up @@ -352,67 +352,65 @@ private static boolean upgradeLegacyNodeFolders(Logger logger, Settings settings
}

// move contents from legacy path to new path
assert nodeLock.getNodePaths().length == legacyNodeLock.getNodePaths().length;
try {
final List<CheckedRunnable<IOException>> upgradeActions = new ArrayList<>();
for (int i = 0; i < legacyNodeLock.getNodePaths().length; i++) {
final NodePath legacyNodePath = legacyNodeLock.getNodePaths()[i];
final NodePath nodePath = nodeLock.getNodePaths()[i];

// determine folders to move and check that there are no extra files/folders
final Set<String> folderNames = new HashSet<>();
final Set<String> expectedFolderNames = new HashSet<>(Arrays.asList(

// node state directory, containing MetadataStateFormat-based node metadata as well as cluster state
MetadataStateFormat.STATE_DIR_NAME,

// indices
INDICES_FOLDER,

// searchable snapshot cache Lucene index
SNAPSHOT_CACHE_FOLDER
));

try (DirectoryStream<Path> stream = Files.newDirectoryStream(legacyNodePath.path)) {
for (Path subFolderPath : stream) {
final String fileName = subFolderPath.getFileName().toString();
if (FileSystemUtils.isDesktopServicesStore(subFolderPath)) {
// ignore
} else if (FileSystemUtils.isAccessibleDirectory(subFolderPath, logger)) {
if (expectedFolderNames.contains(fileName) == false) {
throw new IllegalStateException("unexpected folder encountered during data folder upgrade: " +
subFolderPath);
}
final Path targetSubFolderPath = nodePath.path.resolve(fileName);
if (Files.exists(targetSubFolderPath)) {
throw new IllegalStateException("target folder already exists during data folder upgrade: " +
targetSubFolderPath);
}
folderNames.add(fileName);
} else if (fileName.equals(NODE_LOCK_FILENAME) == false &&
fileName.equals(TEMP_FILE_NAME) == false) {
throw new IllegalStateException("unexpected file/folder encountered during data folder upgrade: " +
final NodePath legacyNodePath = legacyNodeLock.getNodePath();
final NodePath nodePath = nodeLock.getNodePath();

// determine folders to move and check that there are no extra files/folders
final Set<String> folderNames = new HashSet<>();
final Set<String> expectedFolderNames = new HashSet<>(Arrays.asList(

// node state directory, containing MetadataStateFormat-based node metadata as well as cluster state
MetadataStateFormat.STATE_DIR_NAME,

// indices
INDICES_FOLDER,

// searchable snapshot cache Lucene index
SNAPSHOT_CACHE_FOLDER
));

try (DirectoryStream<Path> stream = Files.newDirectoryStream(legacyNodePath.path)) {
for (Path subFolderPath : stream) {
final String fileName = subFolderPath.getFileName().toString();
if (FileSystemUtils.isDesktopServicesStore(subFolderPath)) {
// ignore
} else if (FileSystemUtils.isAccessibleDirectory(subFolderPath, logger)) {
if (expectedFolderNames.contains(fileName) == false) {
throw new IllegalStateException("unexpected folder encountered during data folder upgrade: " +
subFolderPath);
}
final Path targetSubFolderPath = nodePath.path.resolve(fileName);
if (Files.exists(targetSubFolderPath)) {
throw new IllegalStateException("target folder already exists during data folder upgrade: " +
targetSubFolderPath);
}
folderNames.add(fileName);
} else if (fileName.equals(NODE_LOCK_FILENAME) == false &&
fileName.equals(TEMP_FILE_NAME) == false) {
throw new IllegalStateException("unexpected file/folder encountered during data folder upgrade: " +
subFolderPath);
}
}
}

assert Sets.difference(folderNames, expectedFolderNames).isEmpty() :
"expected indices and/or state dir folder but was " + folderNames;
assert Sets.difference(folderNames, expectedFolderNames).isEmpty() :
"expected indices and/or state dir folder but was " + folderNames;

upgradeActions.add(() -> {
for (String folderName : folderNames) {
final Path sourceSubFolderPath = legacyNodePath.path.resolve(folderName);
final Path targetSubFolderPath = nodePath.path.resolve(folderName);
Files.move(sourceSubFolderPath, targetSubFolderPath, StandardCopyOption.ATOMIC_MOVE);
logger.info("data folder upgrade: moved from [{}] to [{}]", sourceSubFolderPath, targetSubFolderPath);
}
IOUtils.fsync(nodePath.path, true);
});

upgradeActions.add(() -> {
for (String folderName : folderNames) {
final Path sourceSubFolderPath = legacyNodePath.path.resolve(folderName);
final Path targetSubFolderPath = nodePath.path.resolve(folderName);
Files.move(sourceSubFolderPath, targetSubFolderPath, StandardCopyOption.ATOMIC_MOVE);
logger.info("data folder upgrade: moved from [{}] to [{}]", sourceSubFolderPath, targetSubFolderPath);
}
IOUtils.fsync(nodePath.path, true);
});
}
// now do the actual upgrade. start by upgrading the node metadata file before moving anything, since a downgrade in an
// intermediate state would be pretty disastrous
loadNodeMetadata(settings, logger, legacyNodeLock.getNodePaths());
loadNodeMetadata(settings, logger, legacyNodeLock.getNodePath());
for (CheckedRunnable<IOException> upgradeAction : upgradeActions) {
upgradeAction.run();
}
Expand Down Expand Up @@ -920,12 +918,12 @@ public String nodeId() {
/**
* Returns an array of all of the {@link NodePath}s.
*/
public NodePath[] nodePaths() {
public NodePath nodePath() {
assertEnvIsLocked();
if (nodePaths == null || locks == null) {
throw new IllegalStateException("node is not configured to store local location");
}
return nodePaths;
return nodePaths[0];
}

/**
Expand Down
82 changes: 3 additions & 79 deletions server/src/main/java/org/elasticsearch/index/shard/ShardPath.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,16 @@

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.Strings;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.IndexSettings;

import java.io.IOException;
import java.math.BigInteger;
import java.nio.file.FileStore;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
Expand Down Expand Up @@ -192,85 +188,13 @@ public static ShardPath selectNewPathForShard(NodeEnvironment env, ShardId shard

if (indexSettings.hasCustomDataPath()) {
dataPath = env.resolveCustomLocation(indexSettings.customDataPath(), shardId);
statePath = env.nodePaths()[0].resolve(shardId);
statePath = env.nodePath().resolve(shardId);
} else {
BigInteger totFreeSpace = BigInteger.ZERO;
for (NodeEnvironment.NodePath nodePath : env.nodePaths()) {
totFreeSpace = totFreeSpace.add(BigInteger.valueOf(nodePath.fileStore.getUsableSpace()));
}

// TODO: this is a hack!! We should instead keep track of incoming (relocated) shards since we know
// how large they will be once they're done copying, instead of a silly guess for such cases:

// Very rough heuristic of how much disk space we expect the shard will use over its lifetime, the max of current average
// shard size across the cluster and 5% of the total available free space on this node:
BigInteger estShardSizeInBytes = BigInteger.valueOf(avgShardSizeInBytes).max(totFreeSpace.divide(BigInteger.valueOf(20)));

// TODO - do we need something more extensible? Yet, this does the job for now...
final NodeEnvironment.NodePath[] paths = env.nodePaths();

// If no better path is chosen, use the one with the most space by default
NodeEnvironment.NodePath bestPath = getPathWithMostFreeSpace(env);

if (paths.length != 1) {
Map<NodeEnvironment.NodePath, Long> pathToShardCount = env.shardCountPerPath(shardId.getIndex());

// Compute how much space there is on each path
final Map<NodeEnvironment.NodePath, BigInteger> pathsToSpace = new HashMap<>(paths.length);
for (NodeEnvironment.NodePath nodePath : paths) {
FileStore fileStore = nodePath.fileStore;
BigInteger usableBytes = BigInteger.valueOf(fileStore.getUsableSpace());
pathsToSpace.put(nodePath, usableBytes);
}

bestPath = Arrays.stream(paths)
// Filter out paths that have enough space
.filter((path) -> pathsToSpace.get(path).subtract(estShardSizeInBytes).compareTo(BigInteger.ZERO) > 0)
// Sort by the number of shards for this index
.sorted((p1, p2) -> {
int cmp = Long.compare(pathToShardCount.getOrDefault(p1, 0L),
pathToShardCount.getOrDefault(p2, 0L));
if (cmp == 0) {
// if the number of shards is equal, tie-break with the number of total shards
cmp = Integer.compare(dataPathToShardCount.getOrDefault(p1.path, 0),
dataPathToShardCount.getOrDefault(p2.path, 0));
if (cmp == 0) {
// if the number of shards is equal, tie-break with the usable bytes
cmp = pathsToSpace.get(p2).compareTo(pathsToSpace.get(p1));
}
}
return cmp;
})
// Return the first result
.findFirst()
// Or the existing best path if there aren't any that fit the criteria
.orElse(bestPath);
}

statePath = bestPath.resolve(shardId);
dataPath = statePath;
dataPath = statePath = env.nodePath().resolve(shardId);
}
return new ShardPath(indexSettings.hasCustomDataPath(), dataPath, statePath, shardId);
}

static NodeEnvironment.NodePath getPathWithMostFreeSpace(NodeEnvironment env) throws IOException {
final NodeEnvironment.NodePath[] paths = env.nodePaths();
NodeEnvironment.NodePath bestPath = null;
long maxUsableBytes = Long.MIN_VALUE;
for (NodeEnvironment.NodePath nodePath : paths) {
FileStore fileStore = nodePath.fileStore;
long usableBytes = fileStore.getUsableSpace(); // NB usable bytes doesn't account for reserved space (e.g. incoming recoveries)
assert usableBytes >= 0 : "usable bytes must be >= 0, got: " + usableBytes;

if (bestPath == null || usableBytes > maxUsableBytes) {
// This path has been determined to be "better" based on the usable bytes
maxUsableBytes = usableBytes;
bestPath = nodePath;
}
}
return bestPath;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
15 changes: 5 additions & 10 deletions server/src/main/java/org/elasticsearch/monitor/fs/FsProbe.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,17 @@ public FsInfo stats(FsInfo previous) throws IOException {
if (nodeEnv.hasNodeFile() == false) {
return new FsInfo(System.currentTimeMillis(), null, new FsInfo.Path[0]);
}
NodePath[] dataLocations = nodeEnv.nodePaths();
FsInfo.Path[] paths = new FsInfo.Path[dataLocations.length];
for (int i = 0; i < dataLocations.length; i++) {
paths[i] = getFSInfo(dataLocations[i]);
}
NodePath dataLocation = nodeEnv.nodePath();
FsInfo.Path pathInfo = getFSInfo(dataLocation);
FsInfo.IoStats ioStats = null;
if (Constants.LINUX) {
Set<Tuple<Integer, Integer>> devicesNumbers = new HashSet<>();
for (int i = 0; i < dataLocations.length; i++) {
if (dataLocations[i].majorDeviceNumber != -1 && dataLocations[i].minorDeviceNumber != -1) {
devicesNumbers.add(Tuple.tuple(dataLocations[i].majorDeviceNumber, dataLocations[i].minorDeviceNumber));
}
if (dataLocation.majorDeviceNumber != -1 && dataLocation.minorDeviceNumber != -1) {
devicesNumbers.add(Tuple.tuple(dataLocation.majorDeviceNumber, dataLocation.minorDeviceNumber));
}
ioStats = ioStats(devicesNumbers, previous);
}
return new FsInfo(System.currentTimeMillis(), ioStats, paths);
return new FsInfo(System.currentTimeMillis(), ioStats, new FsInfo.Path[] { pathInfo });
}

final FsInfo.IoStats ioStats(final Set<Tuple<Integer, Integer>> devicesNumbers, final FsInfo previous) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public void testDeleteSafe() throws Exception {
SetOnce<Path> listener = new SetOnce<>();
env.deleteShardDirectorySafe(new ShardId(index, 1), idxSettings, listener::set);
Path deletedPath = listener.get();
assertThat(deletedPath, equalTo(env.nodePaths()[0].resolve(index).resolve("1")));
assertThat(deletedPath, equalTo(env.nodePath().resolve(index).resolve("1")));
}

path = env.indexPath(index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,7 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -133,8 +131,8 @@ public void setup() throws IOException {
clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(Metadata.builder().put(indexMetadata, false).build()).build();

try (NodeEnvironment.NodeLock lock = new NodeEnvironment.NodeLock(logger, environment, Files::exists)) {
final Path[] dataPaths = Arrays.stream(lock.getNodePaths()).filter(Objects::nonNull).map(p -> p.path).toArray(Path[]::new);
try (PersistedClusterStateService.Writer writer = new PersistedClusterStateService(dataPaths, nodeId,
final NodeEnvironment.NodePath dataPath = lock.getNodePath();
try (PersistedClusterStateService.Writer writer = new PersistedClusterStateService(new Path[] { dataPath.path }, nodeId,
xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L).createWriter()) {
writer.writeFullStateAndCommit(1L, clusterState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class TransportGetAutoscalingCapacityActionIT extends AutoscalingIntegTes
public void testCurrentCapacity() throws Exception {
assertThat(capacity().results().keySet(), Matchers.empty());
long memory = OsProbe.getInstance().getTotalPhysicalMemorySize();
long storage = internalCluster().getInstance(NodeEnvironment.class).nodePaths()[0].fileStore.getTotalSpace();
long storage = internalCluster().getInstance(NodeEnvironment.class).nodePath().fileStore.getTotalSpace();
assertThat(memory, greaterThan(0L));
assertThat(storage, greaterThan(0L));
putAutoscalingPolicy("test");
Expand Down
Loading

0 comments on commit 2dc796a

Please sign in to comment.