Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add command-line tool support for Lucene-based metadata storage #50179

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.elasticsearch.packaging.util.ServerUtils;
import org.elasticsearch.packaging.util.Shell.Result;
import org.junit.BeforeClass;
import org.junit.Ignore;

import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -383,7 +382,6 @@ public void test92ElasticsearchNodeCliPackaging() throws Exception {
}
}

@Ignore("https://github.com/elastic/elasticsearch/issues/48701") // TODO unsafe bootstrapping
public void test93ElasticsearchNodeCustomDataPathAndNotEsHomeWorkDir() throws Exception {
Path relativeDataPath = installation.data.relativize(installation.home);
append(installation.config("elasticsearch.yml"), "path.data: " + relativeDataPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
*/
package org.elasticsearch.cluster.coordination;

import joptsimple.OptionSet;
import org.elasticsearch.cli.Terminal;
import org.elasticsearch.cluster.metadata.Manifest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.env.Environment;
import org.elasticsearch.gateway.LucenePersistedStateFactory;

import java.io.IOException;
import java.nio.file.Path;
Expand All @@ -48,14 +49,21 @@ public DetachClusterCommand() {


@Override
protected void processNodePaths(Terminal terminal, Path[] dataPaths, Environment env) throws IOException {
final Tuple<Manifest, MetaData> manifestMetaDataTuple = loadMetaData(terminal, dataPaths);
final Manifest manifest = manifestMetaDataTuple.v1();
final MetaData metaData = manifestMetaDataTuple.v2();
protected void processNodePaths(Terminal terminal, Path[] dataPaths, OptionSet options, Environment env) throws IOException {
final LucenePersistedStateFactory psf = createLucenePersistedStateFactory(dataPaths);

terminal.println(Terminal.Verbosity.VERBOSE, "Loading cluster state");
final ClusterState oldClusterState = loadTermAndClusterState(psf, env).v2();
final ClusterState newClusterState = ClusterState.builder(oldClusterState)
.metaData(updateMetaData(oldClusterState.metaData())).build();
terminal.println(Terminal.Verbosity.VERBOSE,
"[old cluster state = " + oldClusterState + ", new cluster state = " + newClusterState + "]");

confirm(terminal, CONFIRMATION_MSG);

writeNewMetaData(terminal, manifest, updateCurrentTerm(), metaData, updateMetaData(metaData), dataPaths);
try (LucenePersistedStateFactory.Writer writer = psf.createWriter()) {
writer.writeFullStateAndCommit(updateCurrentTerm(), newClusterState);
}

terminal.println(NODE_DETACHED_MSG);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,41 +26,77 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cli.EnvironmentAwareCommand;
import org.elasticsearch.cli.Terminal;
import org.elasticsearch.cluster.metadata.Manifest;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.NodeMetaData;
import org.elasticsearch.gateway.LucenePersistedStateFactory;
import org.elasticsearch.indices.IndicesModule;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public abstract class ElasticsearchNodeCommand extends EnvironmentAwareCommand {
private static final Logger logger = LogManager.getLogger(ElasticsearchNodeCommand.class);
protected static final String DELIMITER = "------------------------------------------------------------------------\n";

static final String STOP_WARNING_MSG =
DELIMITER +
"\n" +
" WARNING: Elasticsearch MUST be stopped before running this tool." +
"\n";
protected static final String FAILED_TO_OBTAIN_NODE_LOCK_MSG = "failed to lock node's directory, is Elasticsearch still running?";
static final String NO_NODE_FOLDER_FOUND_MSG = "no node folder is found in data folder(s), node has not been started yet?";
static final String NO_MANIFEST_FILE_FOUND_MSG = "no manifest file is found, do you run pre 7.0 Elasticsearch?";
protected static final String GLOBAL_GENERATION_MISSING_MSG =
"no metadata is referenced from the manifest file, cluster has never been bootstrapped?";
static final String NO_GLOBAL_METADATA_MSG = "failed to find global metadata, metadata corrupted?";
static final String WRITE_METADATA_EXCEPTION_MSG = "exception occurred when writing new metadata to disk";
protected static final String ABORTED_BY_USER_MSG = "aborted by user";
static final String NO_NODE_FOLDER_FOUND_MSG = "no node folder is found in data folder(s), node has not been started yet?";
static final String NO_NODE_METADATA_FOUND_MSG = "no node meta data is found, node has not been started yet?";
protected static final String CS_MISSING_MSG =
"cluster state is empty, cluster has never been bootstrapped?";

protected static final NamedXContentRegistry namedXContentRegistry = new NamedXContentRegistry(
Stream.of(ClusterModule.getNamedXWriteables().stream(), IndicesModule.getNamedXContents().stream())
.flatMap(Function.identity())
.collect(Collectors.toList()));

public ElasticsearchNodeCommand(String description) {
super(description);
}

public static LucenePersistedStateFactory createLucenePersistedStateFactory(Path[] dataPaths) throws IOException {
final NodeMetaData nodeMetaData = NodeMetaData.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, dataPaths);
if (nodeMetaData == null) {
throw new ElasticsearchException(NO_NODE_METADATA_FOUND_MSG);
}

String nodeId = nodeMetaData.nodeId();
return new LucenePersistedStateFactory(dataPaths, nodeId, namedXContentRegistry, BigArrays.NON_RECYCLING_INSTANCE, true);
}

public static ClusterState clusterState(Environment environment, LucenePersistedStateFactory.OnDiskState onDiskState) {
return ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(environment.settings()))
.version(onDiskState.lastAcceptedVersion)
.metaData(onDiskState.metaData)
.build();
}

public static Tuple<Long, ClusterState> loadTermAndClusterState(LucenePersistedStateFactory psf,
Environment env) throws IOException {
final LucenePersistedStateFactory.OnDiskState bestOnDiskState = psf.loadBestOnDiskState();
if (bestOnDiskState.empty()) {
throw new ElasticsearchException(CS_MISSING_MSG);
}
return Tuple.tuple(bestOnDiskState.currentTerm, clusterState(env, bestOnDiskState));
}

protected void processNodePaths(Terminal terminal, OptionSet options, Environment env) throws IOException {
terminal.println(Terminal.Verbosity.VERBOSE, "Obtaining lock for node");
try (NodeEnvironment.NodeLock lock = new NodeEnvironment.NodeLock(logger, env, Files::exists)) {
Expand All @@ -69,32 +105,12 @@ protected void processNodePaths(Terminal terminal, OptionSet options, Environmen
if (dataPaths.length == 0) {
throw new ElasticsearchException(NO_NODE_FOLDER_FOUND_MSG);
}
processNodePaths(terminal, dataPaths, env);
processNodePaths(terminal, dataPaths, options, env);
} catch (LockObtainFailedException e) {
throw new ElasticsearchException(FAILED_TO_OBTAIN_NODE_LOCK_MSG, e);
}
}

protected Tuple<Manifest, MetaData> loadMetaData(Terminal terminal, Path[] dataPaths) throws IOException {
terminal.println(Terminal.Verbosity.VERBOSE, "Loading manifest file");
final Manifest manifest = Manifest.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, dataPaths);

if (manifest == null) {
throw new ElasticsearchException(NO_MANIFEST_FILE_FOUND_MSG);
}
if (manifest.isGlobalGenerationMissing()) {
throw new ElasticsearchException(GLOBAL_GENERATION_MISSING_MSG);
}
terminal.println(Terminal.Verbosity.VERBOSE, "Loading global metadata file");
final MetaData metaData = MetaData.FORMAT_PRESERVE_CUSTOMS.loadGeneration(
logger, NamedXContentRegistry.EMPTY, manifest.getGlobalGeneration(), dataPaths);
if (metaData == null) {
throw new ElasticsearchException(NO_GLOBAL_METADATA_MSG + " [generation = " + manifest.getGlobalGeneration() + "]");
}

return Tuple.tuple(manifest, metaData);
}

protected void confirm(Terminal terminal, String msg) {
terminal.println(msg);
String text = terminal.readText("Confirm [y/N] ");
Expand All @@ -104,7 +120,7 @@ protected void confirm(Terminal terminal, String msg) {
}

@Override
protected final void execute(Terminal terminal, OptionSet options, Environment env) throws Exception {
public final void execute(Terminal terminal, OptionSet options, Environment env) throws Exception {
terminal.println(STOP_WARNING_MSG);
if (validateBeforeLock(terminal, env)) {
processNodePaths(terminal, options, env);
Expand All @@ -126,44 +142,10 @@ protected boolean validateBeforeLock(Terminal terminal, Environment env) {
* Process the paths. Locks for the paths is held during this method invocation.
* @param terminal the terminal to use for messages
* @param dataPaths the paths of the node to process
* @param options the command line options
* @param env the env of the node to process
*/
protected abstract void processNodePaths(Terminal terminal, Path[] dataPaths, Environment env) throws IOException;


protected void writeNewMetaData(Terminal terminal, Manifest oldManifest, long newCurrentTerm,
MetaData oldMetaData, MetaData newMetaData, Path[] dataPaths) {
long newGeneration;
try {
terminal.println(Terminal.Verbosity.VERBOSE,
"[clusterUUID = " + oldMetaData.clusterUUID() + ", committed = " + oldMetaData.clusterUUIDCommitted() + "] => " +
"[clusterUUID = " + newMetaData.clusterUUID() + ", committed = " + newMetaData.clusterUUIDCommitted() + "]");
terminal.println(Terminal.Verbosity.VERBOSE, "New coordination metadata is " + newMetaData.coordinationMetaData());
terminal.println(Terminal.Verbosity.VERBOSE, "Writing new global metadata to disk");
newGeneration = MetaData.FORMAT.write(newMetaData, dataPaths);
Manifest newManifest = new Manifest(newCurrentTerm, oldManifest.getClusterStateVersion(), newGeneration,
oldManifest.getIndexGenerations());
terminal.println(Terminal.Verbosity.VERBOSE, "New manifest is " + newManifest);
terminal.println(Terminal.Verbosity.VERBOSE, "Writing new manifest file to disk");
Manifest.FORMAT.writeAndCleanup(newManifest, dataPaths);
} catch (Exception e) {
terminal.println(Terminal.Verbosity.VERBOSE, "Cleaning up new metadata");
MetaData.FORMAT.cleanupOldFiles(oldManifest.getGlobalGeneration(), dataPaths);
throw new ElasticsearchException(WRITE_METADATA_EXCEPTION_MSG, e);
}
// if cleaning old files fail, we still succeeded.
try {
cleanUpOldMetaData(terminal, dataPaths, newGeneration);
} catch (Exception e) {
terminal.println(Terminal.Verbosity.SILENT,
"Warning: Cleaning up old metadata failed, but operation was otherwise successful (message: " + e.getMessage() + ")");
}
}

protected void cleanUpOldMetaData(Terminal terminal, Path[] dataPaths, long newGeneration) {
terminal.println(Terminal.Verbosity.VERBOSE, "Cleaning up old metadata");
MetaData.FORMAT.cleanupOldFiles(newGeneration, dataPaths);
}
protected abstract void processNodePaths(Terminal terminal, Path[] dataPaths, OptionSet options, Environment env) throws IOException;

protected NodeEnvironment.NodePath[] toNodePaths(Path[] dataPaths) {
return Arrays.stream(dataPaths).map(ElasticsearchNodeCommand::createNodePath).toArray(NodeEnvironment.NodePath[]::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,17 @@
*/
package org.elasticsearch.cluster.coordination;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import joptsimple.OptionSet;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cli.Terminal;
import org.elasticsearch.cluster.metadata.Manifest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeMetaData;
import org.elasticsearch.gateway.LucenePersistedStateFactory;
import org.elasticsearch.node.Node;

import java.io.IOException;
Expand All @@ -40,8 +38,6 @@

public class UnsafeBootstrapMasterCommand extends ElasticsearchNodeCommand {

private static final Logger logger = LogManager.getLogger(UnsafeBootstrapMasterCommand.class);

static final String CLUSTER_STATE_TERM_VERSION_MSG_FORMAT =
"Current node cluster state (term, version) pair is (%s, %s)";
static final String CONFIRMATION_MSG =
Expand All @@ -58,8 +54,6 @@ public class UnsafeBootstrapMasterCommand extends ElasticsearchNodeCommand {

static final String NOT_MASTER_NODE_MSG = "unsafe-bootstrap tool can only be run on master eligible node";

static final String NO_NODE_METADATA_FOUND_MSG = "no node meta data is found, node has not been started yet?";

static final String EMPTY_LAST_COMMITTED_VOTING_CONFIG_MSG =
"last committed voting voting configuration is empty, cluster has never been bootstrapped?";

Expand All @@ -83,49 +77,52 @@ protected boolean validateBeforeLock(Terminal terminal, Environment env) {
return true;
}

protected void processNodePaths(Terminal terminal, Path[] dataPaths, Environment env) throws IOException {
terminal.println(Terminal.Verbosity.VERBOSE, "Loading node metadata");
final NodeMetaData nodeMetaData = NodeMetaData.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, dataPaths);
if (nodeMetaData == null) {
throw new ElasticsearchException(NO_NODE_METADATA_FOUND_MSG);
}
protected void processNodePaths(Terminal terminal, Path[] dataPaths, OptionSet options, Environment env) throws IOException {
final LucenePersistedStateFactory psf = createLucenePersistedStateFactory(dataPaths);

String nodeId = nodeMetaData.nodeId();
terminal.println(Terminal.Verbosity.VERBOSE, "Current nodeId is " + nodeId);
final Tuple<Long, ClusterState> state = loadTermAndClusterState(psf, env);
final ClusterState oldClusterState = state.v2();

final MetaData metaData = oldClusterState.metaData();

final Tuple<Manifest, MetaData> manifestMetaDataTuple = loadMetaData(terminal, dataPaths);
final Manifest manifest = manifestMetaDataTuple.v1();
final MetaData metaData = manifestMetaDataTuple.v2();
final CoordinationMetaData coordinationMetaData = metaData.coordinationMetaData();
if (coordinationMetaData == null ||
coordinationMetaData.getLastCommittedConfiguration() == null ||
coordinationMetaData.getLastCommittedConfiguration().isEmpty()) {
coordinationMetaData.getLastCommittedConfiguration() == null ||
coordinationMetaData.getLastCommittedConfiguration().isEmpty()) {
throw new ElasticsearchException(EMPTY_LAST_COMMITTED_VOTING_CONFIG_MSG);
}
terminal.println(String.format(Locale.ROOT, CLUSTER_STATE_TERM_VERSION_MSG_FORMAT, coordinationMetaData.term(),
metaData.version()));

confirm(terminal, CONFIRMATION_MSG);
metaData.version()));

CoordinationMetaData newCoordinationMetaData = CoordinationMetaData.builder(coordinationMetaData)
.clearVotingConfigExclusions()
.lastAcceptedConfiguration(new CoordinationMetaData.VotingConfiguration(Collections.singleton(nodeId)))
.lastCommittedConfiguration(new CoordinationMetaData.VotingConfiguration(Collections.singleton(nodeId)))
.build();
.clearVotingConfigExclusions()
.lastAcceptedConfiguration(new CoordinationMetaData.VotingConfiguration(Collections.singleton(psf.getNodeId())))
.lastCommittedConfiguration(new CoordinationMetaData.VotingConfiguration(Collections.singleton(psf.getNodeId())))
.build();

Settings persistentSettings = Settings.builder()
.put(metaData.persistentSettings())
.put(UNSAFE_BOOTSTRAP.getKey(), true)
.build();
.put(metaData.persistentSettings())
.put(UNSAFE_BOOTSTRAP.getKey(), true)
.build();
MetaData newMetaData = MetaData.builder(metaData)
.clusterUUID(MetaData.UNKNOWN_CLUSTER_UUID)
.generateClusterUuidIfNeeded()
.clusterUUIDCommitted(true)
.persistentSettings(persistentSettings)
.coordinationMetaData(newCoordinationMetaData)
.build();

writeNewMetaData(terminal, manifest, manifest.getCurrentTerm(), metaData, newMetaData, dataPaths);
.clusterUUID(MetaData.UNKNOWN_CLUSTER_UUID)
.generateClusterUuidIfNeeded()
.clusterUUIDCommitted(true)
.persistentSettings(persistentSettings)
.coordinationMetaData(newCoordinationMetaData)
.build();

final ClusterState newClusterState = ClusterState.builder(oldClusterState)
.metaData(newMetaData).build();

terminal.println(Terminal.Verbosity.VERBOSE,
"[old cluster state = " + oldClusterState + ", new cluster state = " + newClusterState + "]");

confirm(terminal, CONFIRMATION_MSG);

try (LucenePersistedStateFactory.Writer writer = psf.createWriter()) {
writer.writeFullStateAndCommit(state.v1(), newClusterState);
}

terminal.println(MASTER_NODE_BOOTSTRAPPED_MSG);
}
Expand Down
Loading