Skip to content

Commit

Permalink
Revert "Make NodeEnvironment.nodeDataPaths singular (#72432)" (#78861)
Browse files Browse the repository at this point in the history
This reverts commit 6a7298e.

The revert was not clean. There were two adjustements necessary.
First, this effectively must revert #73136. Second, some tests
were written after MDP removal so needed to be made multi path aware.

relates #78525
relates #71205
  • Loading branch information
rjernst authored Oct 11, 2021
1 parent c5e04e1 commit 5744ed5
Show file tree
Hide file tree
Showing 21 changed files with 184 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public void testBootstrapNoClusterState() throws IOException {
internalCluster().stopRandomDataNode();
Environment environment = TestEnvironment.newEnvironment(
Settings.builder().put(internalCluster().getDefaultSettings()).put(dataPathSettings).build());
PersistedClusterStateService.deleteAll(nodeEnvironment.nodeDataPath());
PersistedClusterStateService.deleteAll(nodeEnvironment.nodeDataPaths());

expectThrows(() -> unsafeBootstrap(environment), ElasticsearchNodeCommand.NO_NODE_METADATA_FOUND_MSG);
}
Expand All @@ -170,7 +170,7 @@ public void testDetachNoClusterState() throws IOException {
internalCluster().stopRandomDataNode();
Environment environment = TestEnvironment.newEnvironment(
Settings.builder().put(internalCluster().getDefaultSettings()).put(dataPathSettings).build());
PersistedClusterStateService.deleteAll(nodeEnvironment.nodeDataPath());
PersistedClusterStateService.deleteAll(nodeEnvironment.nodeDataPaths());

expectThrows(() -> detachCluster(environment), ElasticsearchNodeCommand.NO_NODE_METADATA_FOUND_MSG);
}
Expand Down Expand Up @@ -252,7 +252,7 @@ public void test3MasterNodes2Failed() throws Exception {

logger.info("--> unsafely-bootstrap 1st master-eligible node");
MockTerminal terminal = unsafeBootstrap(environmentMaster1);
Metadata metadata = ElasticsearchNodeCommand.createPersistedClusterStateService(Settings.EMPTY, nodeEnvironment.nodeDataPath())
Metadata metadata = ElasticsearchNodeCommand.createPersistedClusterStateService(Settings.EMPTY, nodeEnvironment.nodeDataPaths())
.loadBestOnDiskState().metadata;
assertThat(terminal.getOutput(), containsString(
String.format(Locale.ROOT, UnsafeBootstrapMasterCommand.CLUSTER_STATE_TERM_VERSION_MSG_FORMAT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,15 @@ public Settings onNodeStopped(String nodeName) {
assertThat(ex.getMessage(), startsWith("node does not have the data role but has shard data"));
}

private IllegalStateException expectThrowsOnRestart(CheckedConsumer<Path, Exception> onNodeStopped) {
private IllegalStateException expectThrowsOnRestart(CheckedConsumer<Path[], Exception> onNodeStopped) {
internalCluster().startNode();
final Path dataPath = internalCluster().getInstance(NodeEnvironment.class).nodeDataPath();
final Path[] dataPaths = internalCluster().getInstance(NodeEnvironment.class).nodeDataPaths();
return expectThrows(IllegalStateException.class,
() -> internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) {
try {
onNodeStopped.accept(dataPath);
onNodeStopped.accept(dataPaths);
} catch (Exception e) {
throw new AssertionError(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,18 +513,20 @@ public void testHalfDeletedIndexImport() throws Exception {
ensureGreen("test");

final Metadata metadata = internalCluster().getInstance(ClusterService.class).state().metadata();
final Path path = internalCluster().getInstance(NodeEnvironment.class).nodeDataPath();
final Path[] paths = internalCluster().getInstance(NodeEnvironment.class).nodeDataPaths();
final String nodeId = client().admin().cluster().prepareNodesInfo(nodeName).clear().get().getNodes().get(0).getNode().getId();

writeBrokenMeta(metaStateService -> {
IOUtils.rm(path.resolve(PersistedClusterStateService.METADATA_DIRECTORY_NAME));
for (final Path path : paths) {
IOUtils.rm(path.resolve(PersistedClusterStateService.METADATA_DIRECTORY_NAME));
}
metaStateService.writeGlobalState("test", Metadata.builder(metadata)
// we remove the manifest file, resetting the term and making this look like an upgrade from 6.x, so must also reset the
// term in the coordination metadata
.coordinationMetadata(CoordinationMetadata.builder(metadata.coordinationMetadata()).term(0L).build())
// add a tombstone but do not delete the index metadata from disk
.putCustom(IndexGraveyard.TYPE, IndexGraveyard.builder().addTombstone(metadata.index("test").getIndex()).build()).build());
NodeMetadata.FORMAT.writeAndCleanup(new NodeMetadata(nodeId, Version.CURRENT), path);
NodeMetadata.FORMAT.writeAndCleanup(new NodeMetadata(nodeId, Version.CURRENT), paths);
});

ensureGreen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public ElasticsearchNodeCommand(String description) {
super(description);
}

public static PersistedClusterStateService createPersistedClusterStateService(Settings settings, Path... dataPaths) throws IOException {
public static PersistedClusterStateService createPersistedClusterStateService(Settings settings, Path[] dataPaths) throws IOException {
final NodeMetadata nodeMetadata = PersistedClusterStateService.nodeMetadata(dataPaths);
if (nodeMetadata == null) {
throw new ElasticsearchException(NO_NODE_METADATA_FOUND_MSG);
Expand Down
12 changes: 9 additions & 3 deletions server/src/main/java/org/elasticsearch/env/NodeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -900,9 +900,13 @@ public boolean hasNodeFile() {
* Returns an array of all of the nodes data locations.
* @throws IllegalStateException if the node is not configured to store local locations
*/
public Path nodeDataPath() {
public Path[] nodeDataPaths() {
assertEnvIsLocked();
return nodePaths[0].path;
Path[] paths = new Path[nodePaths.length];
for(int i=0;i<paths.length;i++) {
paths[i] = nodePaths[i].path;
}
return paths;
}

/**
Expand Down Expand Up @@ -1297,7 +1301,9 @@ public static Path shardStatePathToDataPath(Path shardPath) {
* This prevents disasters if nodes are started under the wrong username etc.
*/
private void assertCanWrite() throws IOException {
tryWriteTempFile(nodeDataPath());
for (Path path : nodeDataPaths()) { // check node-paths are writable
tryWriteTempFile(path);
}
for (String indexFolderName : this.availableIndexFolders()) {
for (Path indexPath : this.resolveIndexFolder(indexFolderName)) { // check index paths are writable
Path indexStatePath = indexPath.resolve(MetadataStateFormat.STATE_DIR_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public MetaStateService(NodeEnvironment nodeEnv, NamedXContentRegistry namedXCon
* @throws IOException if some IOException when loading files occurs or there is no metadata referenced by manifest file.
*/
public Tuple<Manifest, Metadata> loadFullState() throws IOException {
final Manifest manifest = MANIFEST_FORMAT.loadLatestState(logger, namedXContentRegistry, nodeEnv.nodeDataPath());
final Manifest manifest = MANIFEST_FORMAT.loadLatestState(logger, namedXContentRegistry, nodeEnv.nodeDataPaths());
if (manifest == null) {
return loadFullStateBWC();
}
Expand All @@ -68,7 +68,7 @@ public Tuple<Manifest, Metadata> loadFullState() throws IOException {
metadataBuilder = Metadata.builder();
} else {
final Metadata globalMetadata = METADATA_FORMAT.loadGeneration(logger, namedXContentRegistry, manifest.getGlobalGeneration(),
nodeEnv.nodeDataPath());
nodeEnv.nodeDataPaths());
if (globalMetadata != null) {
metadataBuilder = Metadata.builder(globalMetadata);
} else {
Expand Down Expand Up @@ -101,7 +101,7 @@ private Tuple<Manifest, Metadata> loadFullStateBWC() throws IOException {
Metadata.Builder metadataBuilder;

Tuple<Metadata, Long> metadataAndGeneration =
METADATA_FORMAT.loadLatestStateWithGeneration(logger, namedXContentRegistry, nodeEnv.nodeDataPath());
METADATA_FORMAT.loadLatestStateWithGeneration(logger, namedXContentRegistry, nodeEnv.nodeDataPaths());
Metadata globalMetadata = metadataAndGeneration.v1();
long globalStateGeneration = metadataAndGeneration.v2();

Expand Down Expand Up @@ -173,7 +173,7 @@ List<IndexMetadata> loadIndicesStates(Predicate<String> excludeIndexPathIdsPredi
* Loads the global state, *without* index state, see {@link #loadFullState()} for that.
*/
Metadata loadGlobalState() throws IOException {
return METADATA_FORMAT.loadLatestState(logger, namedXContentRegistry, nodeEnv.nodeDataPath());
return METADATA_FORMAT.loadLatestState(logger, namedXContentRegistry, nodeEnv.nodeDataPaths());
}

/**
Expand All @@ -185,7 +185,7 @@ Metadata loadGlobalState() throws IOException {
public void writeManifestAndCleanup(String reason, Manifest manifest) throws WriteStateException {
logger.trace("[_meta] writing state, reason [{}]", reason);
try {
long generation = MANIFEST_FORMAT.writeAndCleanup(manifest, nodeEnv.nodeDataPath());
long generation = MANIFEST_FORMAT.writeAndCleanup(manifest, nodeEnv.nodeDataPaths());
logger.trace("[_meta] state written (generation: {})", generation);
} catch (WriteStateException ex) {
throw new WriteStateException(ex.isDirty(), "[_meta]: failed to write meta state", ex);
Expand Down Expand Up @@ -222,7 +222,7 @@ public long writeIndex(String reason, IndexMetadata indexMetadata) throws WriteS
long writeGlobalState(String reason, Metadata metadata) throws WriteStateException {
logger.trace("[_global] writing state, reason [{}]", reason);
try {
long generation = METADATA_FORMAT.write(metadata, nodeEnv.nodeDataPath());
long generation = METADATA_FORMAT.write(metadata, nodeEnv.nodeDataPaths());
logger.trace("[_global] state written");
return generation;
} catch (WriteStateException ex) {
Expand All @@ -236,7 +236,7 @@ long writeGlobalState(String reason, Metadata metadata) throws WriteStateExcepti
* @param currentGeneration current state generation to keep in the directory.
*/
void cleanupGlobalState(long currentGeneration) {
METADATA_FORMAT.cleanupOldFiles(currentGeneration, nodeEnv.nodeDataPath());
METADATA_FORMAT.cleanupOldFiles(currentGeneration, nodeEnv.nodeDataPaths());
}

/**
Expand All @@ -254,8 +254,8 @@ public void cleanupIndex(Index index, long currentGeneration) {
* (only used for dangling indices at that point).
*/
public void unreferenceAll() throws IOException {
MANIFEST_FORMAT.writeAndCleanup(Manifest.empty(), nodeEnv.nodeDataPath()); // write empty file so that indices become unreferenced
METADATA_FORMAT.cleanupOldFiles(Long.MAX_VALUE, nodeEnv.nodeDataPath());
MANIFEST_FORMAT.writeAndCleanup(Manifest.empty(), nodeEnv.nodeDataPaths()); // write empty file so that indices become unreferenced
METADATA_FORMAT.cleanupOldFiles(Long.MAX_VALUE, nodeEnv.nodeDataPaths());
}

/**
Expand All @@ -270,6 +270,6 @@ public void deleteAll() throws IOException {
// delete meta state directories of indices
MetadataStateFormat.deleteMetaState(nodeEnv.resolveIndexFolder(indexFolderName));
}
MANIFEST_FORMAT.cleanupOldFiles(Long.MAX_VALUE, nodeEnv.nodeDataPath()); // finally delete manifest
MANIFEST_FORMAT.cleanupOldFiles(Long.MAX_VALUE, nodeEnv.nodeDataPaths()); // finally delete manifest
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public class PersistedClusterStateService {

public PersistedClusterStateService(NodeEnvironment nodeEnvironment, NamedXContentRegistry namedXContentRegistry, BigArrays bigArrays,
ClusterSettings clusterSettings, LongSupplier relativeTimeMillisSupplier) {
this(new Path[] { nodeEnvironment.nodeDataPath() }, nodeEnvironment.nodeId(), namedXContentRegistry, bigArrays, clusterSettings,
this(nodeEnvironment.nodeDataPaths(), nodeEnvironment.nodeId(), namedXContentRegistry, bigArrays, clusterSettings,
relativeTimeMillisSupplier);
}

Expand Down Expand Up @@ -205,7 +205,7 @@ private static IndexWriter createIndexWriter(Directory directory, boolean openEx
* Remove all persisted cluster states from the given data paths, for use in tests. Should only be called when there is no open
* {@link Writer} on these paths.
*/
public static void deleteAll(Path... dataPaths) throws IOException {
public static void deleteAll(Path[] dataPaths) throws IOException {
for (Path dataPath : dataPaths) {
Lucene.cleanLuceneIndex(new NIOFSDirectory(dataPath.resolve(METADATA_DIRECTORY_NAME)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.NodeEnvironment;
Expand All @@ -29,7 +30,10 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.HashSet;
import java.util.Set;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

import static org.elasticsearch.monitor.StatusInfo.Status.HEALTHY;
import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY;
Expand All @@ -41,14 +45,17 @@ public class FsHealthService extends AbstractLifecycleComponent implements NodeH

private static final Logger logger = LogManager.getLogger(FsHealthService.class);
private final ThreadPool threadPool;
private volatile StatusInfo statusInfo = new StatusInfo(HEALTHY, "not started");
private volatile boolean enabled;
private volatile boolean brokenLock;
private final TimeValue refreshInterval;
private volatile TimeValue slowPathLoggingThreshold;
private final NodeEnvironment nodeEnv;
private final LongSupplier currentTimeMillisSupplier;
private volatile Scheduler.Cancellable scheduledFuture;

@Nullable
private volatile Set<Path> unhealthyPaths;

public static final Setting<Boolean> ENABLED_SETTING =
Setting.boolSetting("monitor.fs.health.enabled", true, Setting.Property.NodeScope, Setting.Property.Dynamic);
public static final Setting<TimeValue> REFRESH_INTERVAL_SETTING =
Expand Down Expand Up @@ -94,8 +101,18 @@ public void setSlowPathLoggingThreshold(TimeValue slowPathLoggingThreshold) {

@Override
public StatusInfo getHealth() {
StatusInfo statusInfo;
Set<Path> unhealthyPaths = this.unhealthyPaths;
if (enabled == false) {
return new StatusInfo(HEALTHY, "health check disabled");
statusInfo = new StatusInfo(HEALTHY, "health check disabled");
} else if (brokenLock) {
statusInfo = new StatusInfo(UNHEALTHY, "health check failed due to broken node lock");
} else if (unhealthyPaths == null) {
statusInfo = new StatusInfo(HEALTHY, "health check passed");
} else {
String info = "health check failed on [" + unhealthyPaths.stream()
.map(k -> k.toString()).collect(Collectors.joining(",")) + "]";
statusInfo = new StatusInfo(UNHEALTHY, info);
}

return statusInfo;
Expand All @@ -104,7 +121,7 @@ public StatusInfo getHealth() {
class FsHealthMonitor implements Runnable {

static final String TEMP_FILE_NAME = ".es_temp_file";
private final byte[] bytesToWrite;
private byte[] bytesToWrite;

FsHealthMonitor(){
this.bytesToWrite = UUIDs.randomBase64UUID().getBytes(StandardCharsets.UTF_8);
Expand All @@ -123,38 +140,43 @@ public void run() {
}

private void monitorFSHealth() {
final Path path;
Set<Path> currentUnhealthyPaths = null;
final Path[] paths;
try {
path = nodeEnv.nodeDataPath();
paths = nodeEnv.nodeDataPaths();
} catch (IllegalStateException e) {
statusInfo = new StatusInfo(UNHEALTHY, "health check failed due to broken node lock");
logger.error("health check failed", e);
brokenLock = true;
return;
}

final long executionStartTime = currentTimeMillisSupplier.getAsLong();
try {
if (Files.exists(path)) {
final Path tempDataPath = path.resolve(TEMP_FILE_NAME);
Files.deleteIfExists(tempDataPath);
try (OutputStream os = Files.newOutputStream(tempDataPath, StandardOpenOption.CREATE_NEW)) {
os.write(bytesToWrite);
IOUtils.fsync(tempDataPath, false);
for (Path path : paths) {
final long executionStartTime = currentTimeMillisSupplier.getAsLong();
try {
if (Files.exists(path)) {
final Path tempDataPath = path.resolve(TEMP_FILE_NAME);
Files.deleteIfExists(tempDataPath);
try (OutputStream os = Files.newOutputStream(tempDataPath, StandardOpenOption.CREATE_NEW)) {
os.write(bytesToWrite);
IOUtils.fsync(tempDataPath, false);
}
Files.delete(tempDataPath);
final long elapsedTime = currentTimeMillisSupplier.getAsLong() - executionStartTime;
if (elapsedTime > slowPathLoggingThreshold.millis()) {
logger.warn("health check of [{}] took [{}ms] which is above the warn threshold of [{}]",
path, elapsedTime, slowPathLoggingThreshold);
}
}
Files.delete(tempDataPath);
final long elapsedTime = currentTimeMillisSupplier.getAsLong() - executionStartTime;
if (elapsedTime > slowPathLoggingThreshold.millis()) {
logger.warn("health check of [{}] took [{}ms] which is above the warn threshold of [{}]",
path, elapsedTime, slowPathLoggingThreshold);
} catch (Exception ex) {
logger.error(new ParameterizedMessage("health check of [{}] failed", path), ex);
if (currentUnhealthyPaths == null) {
currentUnhealthyPaths = new HashSet<>(1);
}
currentUnhealthyPaths.add(path);
}
} catch (Exception ex) {
statusInfo = new StatusInfo(UNHEALTHY, "health check failed on [" + path + "]");
logger.error(new ParameterizedMessage("health check of [{}] failed", path), ex);
return;
}

statusInfo = new StatusInfo(HEALTHY, "health check passed");
unhealthyPaths = currentUnhealthyPaths;
brokenLock = false;
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,7 @@ public Node start() throws NodeValidationException {
try {
assert injector.getInstance(MetaStateService.class).loadFullState().v1().isEmpty();
final NodeMetadata nodeMetadata = NodeMetadata.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY,
nodeEnvironment.nodeDataPath());
nodeEnvironment.nodeDataPaths());
assert nodeMetadata != null;
assert nodeMetadata.nodeVersion().equals(Version.CURRENT);
assert nodeMetadata.nodeId().equals(localNodeFactory.getNode().getId());
Expand Down
Loading

0 comments on commit 5744ed5

Please sign in to comment.