Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Safer in-place migration handling #20366

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
4 changes: 4 additions & 0 deletions changelog/unreleased/pr-20366.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
type = "f"
message = "Datanode preflight check for node.lock, preventing parallel run of opensearch and datanode"

pulls = ["20366"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.datanode.rest;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.graylog.datanode.configuration.DatanodeConfiguration;
import org.graylog.plugins.views.storage.migration.state.actions.OpensearchLockCheckResult;
import org.graylog.plugins.views.storage.migration.state.actions.OpensearchNodeLock;
import org.graylog2.bootstrap.preflight.PreflightCheckException;

import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.NonWritableChannelException;
import java.nio.channels.OverlappingFileLockException;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.Collections;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Path("/lock-check")
@Produces(MediaType.APPLICATION_JSON)
public class OpensearchLockCheckController {

private final java.nio.file.Path dataTargetDir;

@Inject
public OpensearchLockCheckController(DatanodeConfiguration datanodeConfiguration) {
this(datanodeConfiguration.datanodeDirectories().getDataTargetDir());
}

public OpensearchLockCheckController(java.nio.file.Path dataTargetDir) {
this.dataTargetDir = dataTargetDir;
}

@GET
public OpensearchLockCheckResult checkLockFiles() {
final java.nio.file.Path nodesDir = dataTargetDir.resolve("nodes");
if (Files.isDirectory(nodesDir)) {
try (final Stream<java.nio.file.Path> nodes = Files.list(nodesDir)) {
return nodes.map(n -> new OpensearchNodeLock(n, isDirLocked(n)))
.collect(Collectors.collectingAndThen(Collectors.toList(), OpensearchLockCheckResult::new));
} catch (IOException e) {
throw new RuntimeException(e);
}
} else {
return new OpensearchLockCheckResult(Collections.emptyList());
}
}

private static boolean isDirLocked(java.nio.file.Path nodeDir) {
final java.nio.file.Path lockFile = nodeDir.resolve("node.lock");
if (Files.exists(lockFile)) {
try (FileChannel channel = FileChannel.open(lockFile, StandardOpenOption.WRITE)) {
final FileLock fileLock = channel.tryLock();
if (fileLock != null) { // file was not locked, we are good to go, let's release immediately
fileLock.release();
return false;
} else {
return true;
}
} catch (OverlappingFileLockException e) {
return true;
} catch (NonWritableChannelException | IOException e) {
throw new PreflightCheckException("Failed to verify free node.lock file", e);
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ protected void configure() {
addSystemRestResource(OpensearchConnectionCheckController.class);
addSystemRestResource(IndexStateController.class);
addSystemRestResource(CertificatesController.class);
addSystemRestResource(OpensearchLockCheckController.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.datanode.rest;

import jakarta.annotation.Nonnull;
import org.assertj.core.api.Assertions;
import org.graylog.plugins.views.storage.migration.state.actions.OpensearchLockCheckResult;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;

class OpensearchLockCheckControllerTest {
@Test
void testNotLockedDir(@TempDir Path tempDir) throws IOException {
final OpensearchLockCheckController controller = new OpensearchLockCheckController(tempDir);
createLockFile(tempDir);
final OpensearchLockCheckResult result = controller.checkLockFiles();
Assertions.assertThat(result.locks())
.hasSize(1)
.allSatisfy(l -> Assertions.assertThat(l.locked()).isFalse());
}

@Test
void testLockedDir(@TempDir Path tempDir) throws IOException {
final OpensearchLockCheckController controller = new OpensearchLockCheckController(tempDir);
final Path lockFile = createLockFile(tempDir);
lock(lockFile);
final OpensearchLockCheckResult result = controller.checkLockFiles();
Assertions.assertThat(result.locks())
.hasSize(1)
.allSatisfy(l -> Assertions.assertThat(l.locked()).isTrue());
}

@Test
void testEmptyDir(@TempDir Path tempDir) {
final OpensearchLockCheckController controller = new OpensearchLockCheckController(tempDir);
final OpensearchLockCheckResult result = controller.checkLockFiles();
Assertions.assertThat(result.locks())
.isEmpty();
}

@Nonnull
private static Path createLockFile(Path tempDir) throws IOException {
final Path nodeDir = tempDir.resolve("nodes").resolve("0");
Files.createDirectories(nodeDir);
final Path lockFile = nodeDir.resolve("node.lock");
Files.createFile(lockFile);
return lockFile;
}

private FileLock lock(Path lockFile) throws IOException {
FileChannel channel = FileChannel.open(lockFile, StandardOpenOption.WRITE);
return channel.lock();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.plugins.views.storage.migration.state.actions;

import retrofit2.Call;
import retrofit2.http.GET;

public interface DatanodeOpensearchClusterCheckResource {

@GET("/lock-check")
Call<OpensearchLockCheckResult> checkLocks();

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ public interface MigrationActions {

void startDataNodes();

boolean dataNodeStartupFinished();
boolean allDatanodesAvailable();

void setPreflightFinished();

void startRemoteReindex();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,15 @@ public void runDirectoryCompatibilityCheck() {

@Override
public boolean isOldClusterStopped() {
// TODO: add real test
return true;
final Map<String, OpensearchLockCheckResult> results = datanodeProxy.remoteInterface(DatanodeResolver.ALL_NODES_KEYWORD, DatanodeOpensearchClusterCheckResource.class, DatanodeOpensearchClusterCheckResource::checkLocks);
final boolean anyLocked = results.values().stream().anyMatch(v -> v.locks().stream().anyMatch(OpensearchNodeLock::locked));

if (anyLocked) {
results.forEach((key, value) -> value.locks().stream()
.filter(OpensearchNodeLock::locked)
.forEach(v -> LOG.info("Data directory of datanode {} is still locked by another Opensearch process. Lock file: {}", key, v.path().toAbsolutePath())));
}
return !anyLocked;
}

@Override
Expand Down Expand Up @@ -232,15 +239,18 @@ private void startDataNode(DataNodeDto node) {
}

@Override
public boolean dataNodeStartupFinished() {
boolean dataNodesAvailable = nodeService.allActive().values().stream().allMatch(node -> node.getDataNodeStatus() == DataNodeStatus.AVAILABLE);
if (dataNodesAvailable) { // set preflight config to FINISHED to be sure that a Graylog restart will connect to the data nodes
var preflight = preflightConfigService.getPreflightConfigResult();
if (preflight == null || !preflight.equals(PreflightConfigResult.FINISHED)) {
preflightConfigService.setConfigResult(PreflightConfigResult.FINISHED);
}
public boolean allDatanodesAvailable() {
final Map<String, DataNodeDto> activeNodes = nodeService.allActive();
return !activeNodes.isEmpty() && activeNodes.values()
.stream()
.allMatch(node -> node.getDataNodeStatus() == DataNodeStatus.AVAILABLE);
}

public void setPreflightFinished() {
var preflight = preflightConfigService.getPreflightConfigResult();
if (preflight == null || !preflight.equals(PreflightConfigResult.FINISHED)) {
preflightConfigService.setConfigResult(PreflightConfigResult.FINISHED);
}
return dataNodesAvailable;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.plugins.views.storage.migration.state.actions;

import java.util.Collection;

public record OpensearchLockCheckResult(Collection<OpensearchNodeLock> locks) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.plugins.views.storage.migration.state.actions;

import java.nio.file.Path;

public record OpensearchNodeLock(Path path, boolean locked) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,13 @@ private static StateMachineConfig<MigrationState, MigrationStep> configureStates
// remote reindexing branch of the migration
config.configure(MigrationState.REMOTE_REINDEX_WELCOME_PAGE)
.onEntry(migrationActions::reindexUpgradeSelected)
.permitIf(MigrationStep.PROVISION_DATANODE_CERTIFICATES, MigrationState.PROVISION_DATANODE_CERTIFICATES_RUNNING, () -> !migrationActions.dataNodeStartupFinished() && migrationActions.compatibleDatanodesRunning(), migrationActions::provisionAndStartDataNodes)
.permitIf(MigrationStep.SHOW_DATA_MIGRATION_QUESTION, MigrationState.EXISTING_DATA_MIGRATION_QUESTION_PAGE, migrationActions::dataNodeStartupFinished);
.permitIf(MigrationStep.PROVISION_DATANODE_CERTIFICATES, MigrationState.PROVISION_DATANODE_CERTIFICATES_RUNNING, () -> !migrationActions.allDatanodesAvailable() && migrationActions.compatibleDatanodesRunning(), migrationActions::provisionAndStartDataNodes)
.permitIf(MigrationStep.SHOW_DATA_MIGRATION_QUESTION, MigrationState.EXISTING_DATA_MIGRATION_QUESTION_PAGE, migrationActions::allDatanodesAvailable)
.onExit(migrationActions::setPreflightFinished);

// This page should contain the "Please restart Graylog to continue with data migration"
config.configure(MigrationState.PROVISION_DATANODE_CERTIFICATES_RUNNING)
.permitIf(MigrationStep.SHOW_DATA_MIGRATION_QUESTION, MigrationState.EXISTING_DATA_MIGRATION_QUESTION_PAGE, migrationActions::dataNodeStartupFinished);
.permitIf(MigrationStep.SHOW_DATA_MIGRATION_QUESTION, MigrationState.EXISTING_DATA_MIGRATION_QUESTION_PAGE, migrationActions::allDatanodesAvailable);

config.configure(MigrationState.EXISTING_DATA_MIGRATION_QUESTION_PAGE)
.permit(MigrationStep.SHOW_MIGRATE_EXISTING_DATA, MigrationState.MIGRATE_EXISTING_DATA, migrationActions::getElasticsearchHosts)
Expand All @@ -93,7 +94,7 @@ private static StateMachineConfig<MigrationState, MigrationStep> configureStates
.permitIf(MigrationStep.SHOW_ASK_TO_SHUTDOWN_OLD_CLUSTER, MigrationState.ASK_TO_SHUTDOWN_OLD_CLUSTER, migrationActions::isRemoteReindexingFinished);

config.configure(MigrationState.ASK_TO_SHUTDOWN_OLD_CLUSTER)
.permitIf(MigrationStep.CONFIRM_OLD_CLUSTER_STOPPED, MigrationState.FINISHED, migrationActions::isOldClusterStopped, migrationActions::finishRemoteReindexMigration);
.permit(MigrationStep.CONFIRM_OLD_CLUSTER_STOPPED, MigrationState.FINISHED, migrationActions::finishRemoteReindexMigration);

// in place / rolling upgrade branch of the migration
config.configure(MigrationState.ROLLING_UPGRADE_MIGRATION_WELCOME_PAGE)
Expand All @@ -114,11 +115,12 @@ private static StateMachineConfig<MigrationState, MigrationStep> configureStates
.permit(MigrationStep.SHOW_STOP_PROCESSING_PAGE, MigrationState.MESSAGE_PROCESSING_STOP, migrationActions::stopMessageProcessing);

config.configure(MigrationState.MESSAGE_PROCESSING_STOP)
.permit(MigrationStep.SHOW_ROLLING_UPGRADE_ASK_TO_SHUTDOWN_OLD_CLUSTER, MigrationState.RESTART_GRAYLOG, migrationActions::startDataNodes);
.permitIf(MigrationStep.SHOW_ROLLING_UPGRADE_ASK_TO_SHUTDOWN_OLD_CLUSTER, MigrationState.RESTART_GRAYLOG, migrationActions::isOldClusterStopped, migrationActions::startDataNodes);

// shows the "remove connection string, restart graylog"
config.configure(MigrationState.RESTART_GRAYLOG)
.permitIf(MigrationStep.CONFIRM_OLD_CONNECTION_STRING_FROM_CONFIG_REMOVED_AND_GRAYLOG_RESTARTED, MigrationState.FINISHED, migrationActions::dataNodeStartupFinished);
.onEntry(migrationActions::setPreflightFinished)
.permitIf(MigrationStep.CONFIRM_OLD_CONNECTION_STRING_FROM_CONFIG_REMOVED_AND_GRAYLOG_RESTARTED, MigrationState.FINISHED, migrationActions::allDatanodesAvailable);

return config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,15 @@ public MigrationActionsAdapter(MigrationStateMachineContext context) {
}

@Override
public boolean dataNodeStartupFinished() {
public boolean allDatanodesAvailable() {
return false;
}

@Override
public void setPreflightFinished() {

}

@Override
public void startRemoteReindex() {

Expand Down
Loading
Loading