From cdd05a5453483b13a5edb2f63c81d4d4ab3d5da0 Mon Sep 17 00:00:00 2001 From: Johannes Scheuermann Date: Mon, 9 Sep 2024 16:44:25 +0200 Subject: [PATCH] Add recovery plugin clean (#2127) * Add support for multi-region recovery in kubectl-fdb --- docs/manual/operations.md | 3 + e2e/fixtures/fdb_operator_client.go | 1 + e2e/test_operator_ha/operator_ha_test.go | 242 +-------- .../operator_plugin_test.go | 104 +++- internal/pod_client.go | 18 +- internal/pod_client_test.go | 4 +- internal/pod_helper.go | 16 + kubectl-fdb/cmd/analyze.go | 2 +- kubectl-fdb/cmd/exec.go | 15 +- kubectl-fdb/cmd/exec_test.go | 9 +- kubectl-fdb/cmd/k8s_client.go | 53 +- .../cmd/recover_multi_region_cluster.go | 491 ++++++++++++++++++ .../cmd/recover_multi_region_cluster_test.go | 127 +++++ kubectl-fdb/cmd/root.go | 12 +- 14 files changed, 794 insertions(+), 303 deletions(-) create mode 100644 kubectl-fdb/cmd/recover_multi_region_cluster.go create mode 100644 kubectl-fdb/cmd/recover_multi_region_cluster_test.go diff --git a/docs/manual/operations.md b/docs/manual/operations.md index 336585add..ff8313b15 100644 --- a/docs/manual/operations.md +++ b/docs/manual/operations.md @@ -261,6 +261,9 @@ If the cluster is a multi-region cluster, perform this step for all running regi - Now you can set `spec.Skip = false` to let the operator take over again. - Depending on the state of the multi-region cluster, you probably want to change the desired database configuration to drop ha. +The [kubectl-fdb plugin](../../kubectl-fdb/Readme.md) provides a `recover-multi-region-cluster` command that can be used to automatically recover a cluster with the above steps. +The command has some additional safety checks, to ensure the steps are only performed on a cluster that is unhealthy and the majority of coordinators are unreachable. + ## Next You can continue on to the [next section](scaling.md) or go back to the [table of contents](index.md). diff --git a/e2e/fixtures/fdb_operator_client.go b/e2e/fixtures/fdb_operator_client.go index 427d48962..543ebbfec 100644 --- a/e2e/fixtures/fdb_operator_client.go +++ b/e2e/fixtures/fdb_operator_client.go @@ -111,6 +111,7 @@ rules: resources: - pods/exec verbs: + - get - create - apiGroups: - apps.foundationdb.org diff --git a/e2e/test_operator_ha/operator_ha_test.go b/e2e/test_operator_ha/operator_ha_test.go index b4a2a1012..4efd91427 100644 --- a/e2e/test_operator_ha/operator_ha_test.go +++ b/e2e/test_operator_ha/operator_ha_test.go @@ -30,13 +30,8 @@ This cluster will be used for all tests. */ import ( - "context" - "fmt" "log" - "os" - "path" "strconv" - "strings" "time" fdbv1beta2 "github.com/FoundationDB/fdb-kubernetes-operator/api/v1beta2" @@ -52,11 +47,9 @@ import ( ) var ( - factory *fixtures.Factory - fdbCluster *fixtures.HaFdbCluster - testOptions *fixtures.FactoryOptions - clusterConfig *fixtures.ClusterConfig - clusterOptions []fixtures.ClusterOption + factory *fixtures.Factory + fdbCluster *fixtures.HaFdbCluster + testOptions *fixtures.FactoryOptions ) func init() { @@ -65,9 +58,7 @@ func init() { var _ = BeforeSuite(func() { factory = fixtures.CreateFactory(testOptions) - clusterOptions = factory.GetClusterOptions() - clusterConfig = fixtures.DefaultClusterConfigWithHaMode(fixtures.HaFourZoneSingleSat, false) - fdbCluster = factory.CreateFdbHaCluster(clusterConfig, clusterOptions...) + fdbCluster = factory.CreateFdbHaCluster(fixtures.DefaultClusterConfigWithHaMode(fixtures.HaFourZoneSingleSat, false), factory.GetClusterOptions()...) // Load some data into the cluster. factory.CreateDataLoaderIfAbsent(fdbCluster.GetPrimary()) @@ -220,229 +211,4 @@ var _ = Describe("Operator HA tests", Label("e2e", "pr"), func() { }).WithTimeout(10 * time.Minute).WithPolling(2 * time.Second).Should(BeNumerically(">=", desiredRunningPods)) }) }) - - When("all Pods in the primary and primary satellite are down", func() { - BeforeEach(func() { - // This tests is a destructive test where the cluster will stop working for some period. - availabilityCheck = false - primary := fdbCluster.GetPrimary() - primary.SetSkipReconciliation(true) - - primarySatellite := fdbCluster.GetPrimarySatellite() - primarySatellite.SetSkipReconciliation(true) - - primaryPods := primary.GetPods() - for _, pod := range primaryPods.Items { - factory.DeletePod(&pod) - } - - primarySatellitePods := primarySatellite.GetPods() - for _, pod := range primarySatellitePods.Items { - factory.DeletePod(&pod) - } - - // Wait a short amount of time to let the cluster see that the primary and primary satellite is down. - time.Sleep(5 * time.Second) - }) - - AfterEach(func() { - // Delete the broken cluster. - fdbCluster.Delete() - // Recreate the cluster to make sure the next tests can proceed - fdbCluster = factory.CreateFdbHaCluster(clusterConfig, clusterOptions...) - // Load some data into the cluster. - factory.CreateDataLoaderIfAbsent(fdbCluster.GetPrimary()) - }) - - It("should recover the coordinators", func() { - // Set all the `FoundationDBCluster` resources for this FDB cluster to `spec.Skip = true` to make sure the operator is not changing the manual changed state. - remote := fdbCluster.GetRemote() - remote.SetSkipReconciliation(true) - remoteSatellite := fdbCluster.GetRemoteSatellite() - remoteSatellite.SetSkipReconciliation(true) - - // Fetch the last connection string from the `FoundationDBCluster` status, e.g. `kubectl get fdb ${cluster} -o jsonpath='{ .status.connectionString }'`. - lastConnectionString := remote.GetCluster().Status.ConnectionString - lastConnectionStringParts := strings.Split(lastConnectionString, "@") - addresses := strings.Split(lastConnectionStringParts[1], ",") - // Since this is a multi-region cluster, we expect 9 coordinators. - Expect(addresses).To(HaveLen(9)) - - log.Println("lastConnectionString", lastConnectionString) - - var useTLS bool - coordinators := map[string]fdbv1beta2.ProcessAddress{} - for _, addr := range addresses { - parsed, err := fdbv1beta2.ParseProcessAddress(addr) - Expect(err).NotTo(HaveOccurred()) - log.Println("found coordinator", parsed.String()) - coordinators[parsed.MachineAddress()] = parsed - // If the tls flag is present we assume that the coordinators should make use of TLS. - _, useTLS = parsed.Flags["tls"] - } - - log.Println("coordinators", coordinators, "useTLS", useTLS) - runningCoordinators := map[string]fdbv1beta2.None{} - var runningCoordinator *corev1.Pod - newCoordinators := make([]fdbv1beta2.ProcessAddress, 0, 5) - remotePods := remote.GetPods() - candidates := make([]corev1.Pod, 0, len(remotePods.Items)) - for _, pod := range remotePods.Items { - addr, err := fdbv1beta2.ParseProcessAddress(pod.Status.PodIP) - Expect(err).NotTo(HaveOccurred()) - if coordinatorAddr, ok := coordinators[addr.MachineAddress()]; ok { - log.Println("Found coordinator for remote", pod.Name, "address", coordinatorAddr.String()) - runningCoordinators[addr.MachineAddress()] = fdbv1beta2.None{} - newCoordinators = append(newCoordinators, coordinatorAddr) - if runningCoordinator == nil { - loopPod := pod - runningCoordinator = &loopPod - } - continue - } - - if !fixtures.GetProcessClass(pod).IsTransaction() { - continue - } - - candidates = append(candidates, pod) - } - - remoteSatellitePods := remoteSatellite.GetPods() - for _, pod := range remoteSatellitePods.Items { - addr, err := fdbv1beta2.ParseProcessAddress(pod.Status.PodIP) - Expect(err).NotTo(HaveOccurred()) - if coordinatorAddr, ok := coordinators[addr.MachineAddress()]; ok { - log.Println("Found coordinator for remote satellite", pod.Name, "address", addr.MachineAddress()) - runningCoordinators[addr.MachineAddress()] = fdbv1beta2.None{} - newCoordinators = append(newCoordinators, coordinatorAddr) - if runningCoordinator == nil { - loopPod := pod - runningCoordinator = &loopPod - } - continue - } - - if !fixtures.GetProcessClass(pod).IsTransaction() { - continue - } - - candidates = append(candidates, pod) - } - - // Pick 5 new coordinators. - needsUpload := make([]corev1.Pod, 0, 5) - idx := 0 - for len(newCoordinators) < 5 { - fmt.Println("Current coordinators:", len(newCoordinators)) - candidate := candidates[idx] - addr, err := fdbv1beta2.ParseProcessAddress(candidate.Status.PodIP) - Expect(err).NotTo(HaveOccurred()) - fmt.Println("Adding pod as new coordinators:", candidate.Name) - if useTLS { - addr.Port = 4500 - addr.Flags = map[string]bool{"tls": true} - } else { - addr.Port = 4501 - } - newCoordinators = append(newCoordinators, addr) - needsUpload = append(needsUpload, candidate) - idx++ - } - - // Copy the coordinator state from one of the running coordinators to your local machine: - coordinatorFiles := []string{"coordination-0.fdq", "coordination-1.fdq"} - tmpCoordinatorFiles := make([]string, 2) - tmpDir := GinkgoT().TempDir() - for idx, coordinatorFile := range coordinatorFiles { - tmpCoordinatorFiles[idx] = path.Join(tmpDir, coordinatorFile) - } - - log.Println("tmpCoordinatorFiles", tmpCoordinatorFiles) - stdout, stderr, err := factory.ExecuteCmdOnPod(context.Background(), runningCoordinator, fdbv1beta2.MainContainerName, "find /var/fdb/data/ -type f -name 'coordination-0.fdq'", true) - Expect(err).NotTo(HaveOccurred()) - Expect(stderr).To(BeEmpty()) - - dataDir := path.Dir(strings.TrimSpace(stdout)) - log.Println("find result:", stdout, ",dataDir", dataDir) - for idx, coordinatorFile := range coordinatorFiles { - tmpCoordinatorFile, err := os.OpenFile(tmpCoordinatorFiles[idx], os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0600) - Expect(err).NotTo(HaveOccurred()) - - log.Println("Download files, target:", tmpCoordinatorFiles[idx], "source", path.Join(dataDir, coordinatorFile), "pod", runningCoordinator.Name, "namespace", runningCoordinator.Namespace) - err = factory.DownloadFile(context.Background(), runningCoordinator, fdbv1beta2.MainContainerName, path.Join(dataDir, coordinatorFile), tmpCoordinatorFile) - Expect(err).NotTo(HaveOccurred()) - Expect(tmpCoordinatorFile.Close()).NotTo(HaveOccurred()) - - fileInfo, err := os.Stat(tmpCoordinatorFiles[idx]) - Expect(err).NotTo(HaveOccurred()) - Expect(fileInfo.Size()).To(BeNumerically(">", 0)) - } - - for _, target := range needsUpload { - for idx, coordinatorFile := range coordinatorFiles { - tmpCoordinatorFile, err := os.OpenFile(tmpCoordinatorFiles[idx], os.O_RDONLY, 0600) - Expect(err).NotTo(HaveOccurred()) - - log.Println("Upload files, source:", tmpCoordinatorFile.Name(), "target", path.Join(dataDir, coordinatorFile), "pod", target.Name, "namespace", target.Namespace) - err = factory.UploadFile(context.Background(), &target, fdbv1beta2.MainContainerName, tmpCoordinatorFile, path.Join(dataDir, coordinatorFile)) - Expect(err).NotTo(HaveOccurred()) - Expect(tmpCoordinatorFile.Close()).NotTo(HaveOccurred()) - } - } - - // Update the `ConfigMap` to contain the new connection string, the new connection string must contain the still existing coordinators and the new coordinators. The old entries must be removed. - var newConnectionString strings.Builder - newConnectionString.WriteString(lastConnectionStringParts[0]) - newConnectionString.WriteString("@") - for idx, coordinator := range newCoordinators { - newConnectionString.WriteString(coordinator.String()) - if idx == len(newCoordinators)-1 { - break - } - - newConnectionString.WriteString(",") - } - - newCS := newConnectionString.String() - log.Println("new connection string:", newCS) - for _, cluster := range fdbCluster.GetAllClusters() { - cluster.UpdateConnectionString(newCS) - } - - // Wait ~1 min until the `ConfigMap` is synced to all Pods, you can check the `/var/dynamic-conf/fdb.cluster` inside a Pod if you are unsure. - time.Sleep(2 * time.Minute) - - log.Println("Kill fdbserver processes") - - debugOutput := true - // Now all Pods must be restarted and the previous local cluster file must be deleted to make sure the fdbserver is picking the connection string from the seed cluster file (`/var/dynamic-conf/fdb.cluster`). - for _, pod := range remote.GetPods().Items { - _, _, err := factory.ExecuteCmd(context.Background(), pod.Namespace, pod.Name, fdbv1beta2.MainContainerName, "pkill fdbserver && rm -f /var/fdb/data/fdb.cluster && pkill fdbserver || true", debugOutput) - Expect(err).NotTo(HaveOccurred()) - } - - for _, pod := range remoteSatellite.GetPods().Items { - _, _, err := factory.ExecuteCmd(context.Background(), pod.Namespace, pod.Name, fdbv1beta2.MainContainerName, "pkill fdbserver && rm -f /var/fdb/data/fdb.cluster && pkill fdbserver || true", debugOutput) - Expect(err).NotTo(HaveOccurred()) - } - - log.Println("force recovery") - // Now you can exec into a container and use `fdbcli` to connect to the cluster. - // If you use a multi-region cluster you have to issue `force_recovery_with_data_loss` - _, _, err = remote.RunFdbCliCommandInOperatorWithoutRetry(fmt.Sprintf("force_recovery_with_data_loss %s", remote.GetCluster().Spec.DataCenter), true, 40) - Expect(err).NotTo(HaveOccurred()) - - // Now you can set `spec.Skip = false` to let the operator take over again. - remote.SetSkipReconciliation(false) - remoteSatellite.SetSkipReconciliation(false) - - // Ensure the cluster is available again. - Eventually(func() bool { - return remote.GetStatus().Client.DatabaseStatus.Available - }).WithTimeout(2 * time.Minute).WithPolling(1 * time.Second).Should(BeTrue()) - - // TODO (johscheuer): Add additional testing for different cases. - }) - }) }) diff --git a/e2e/test_operator_plugin/operator_plugin_test.go b/e2e/test_operator_plugin/operator_plugin_test.go index 85a75f713..7430f8062 100644 --- a/e2e/test_operator_plugin/operator_plugin_test.go +++ b/e2e/test_operator_plugin/operator_plugin_test.go @@ -33,12 +33,15 @@ import ( "github.com/FoundationDB/fdb-kubernetes-operator/e2e/fixtures" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "golang.org/x/sync/errgroup" ) var ( - factory *fixtures.Factory - fdbCluster *fixtures.FdbCluster - testOptions *fixtures.FactoryOptions + factory *fixtures.Factory + fdbCluster *fixtures.HaFdbCluster + testOptions *fixtures.FactoryOptions + clusterConfig *fixtures.ClusterConfig + clusterOptions []fixtures.ClusterOption ) func init() { @@ -47,10 +50,9 @@ func init() { var _ = BeforeSuite(func() { factory = fixtures.CreateFactory(testOptions) - fdbCluster = factory.CreateFdbCluster( - fixtures.DefaultClusterConfig(false), - factory.GetClusterOptions()..., - ) + clusterOptions = factory.GetClusterOptions() + clusterConfig = fixtures.DefaultClusterConfigWithHaMode(fixtures.HaFourZoneSingleSat, false) + fdbCluster = factory.CreateFdbHaCluster(clusterConfig, clusterOptions...) }) var _ = AfterSuite(func() { @@ -63,26 +65,96 @@ var _ = AfterSuite(func() { var _ = Describe("Operator Plugin", Label("e2e", "pr"), func() { AfterEach(func() { if CurrentSpecReport().Failed() { - factory.DumpState(fdbCluster) + factory.DumpStateHaCluster(fdbCluster) } - Expect(fdbCluster.WaitForReconciliation()).ToNot(HaveOccurred()) - factory.StopInvariantCheck() - // Make sure all data is present in the cluster - fdbCluster.EnsureTeamTrackersAreHealthy() - fdbCluster.EnsureTeamTrackersHaveMinReplicas() }) When("getting the plugin version from the operator pod", func() { It("should print the version", func() { // Pick one operator pod and execute the kubectl version command to ensure that kubectl-fdb is present // and can be executed. - operatorPod := factory.RandomPickOnePod(factory.GetOperatorPods(fdbCluster.Namespace()).Items) - log.Println("operatorPod", operatorPod.Name) + operatorPod := factory.RandomPickOnePod(factory.GetOperatorPods(fdbCluster.GetPrimary().Namespace()).Items) + log.Println("operatorPod:", operatorPod.Name) Eventually(func(g Gomega) string { - stdout, stderr, err := factory.ExecuteCmdOnPod(context.Background(), &operatorPod, "manager", fmt.Sprintf("kubectl-fdb -n %s version", fdbCluster.Namespace()), false) + stdout, stderr, err := factory.ExecuteCmdOnPod(context.Background(), &operatorPod, "manager", fmt.Sprintf("kubectl-fdb -n %s --version-check=false version", fdbCluster.GetPrimary().Namespace()), false) g.Expect(err).NotTo(HaveOccurred(), stderr) return stdout }).WithTimeout(10 * time.Minute).WithPolling(2 * time.Second).Should(And(ContainSubstring("kubectl-fdb:"), ContainSubstring("foundationdb-operator:"))) }) }) + + When("all Pods in the primary and satellites are down", func() { + BeforeEach(func() { + // This tests is a destructive test where the cluster will stop working for some period. + primary := fdbCluster.GetPrimary() + primary.SetSkipReconciliation(true) + + primarySatellite := fdbCluster.GetPrimarySatellite() + primarySatellite.SetSkipReconciliation(true) + + remoteSatellite := fdbCluster.GetRemoteSatellite() + remoteSatellite.SetSkipReconciliation(true) + + var wg errgroup.Group + log.Println("Delete Pods in primary") + wg.Go(func() error { + for _, pod := range primary.GetPods().Items { + factory.DeletePod(&pod) + } + + return nil + }) + + log.Println("Delete Pods in primary satellite") + wg.Go(func() error { + for _, pod := range primarySatellite.GetPods().Items { + factory.DeletePod(&pod) + } + + return nil + }) + + log.Println("Delete Pods in remote satellite") + wg.Go(func() error { + for _, pod := range remoteSatellite.GetPods().Items { + factory.DeletePod(&pod) + } + + return nil + }) + + Expect(wg.Wait()).NotTo(HaveOccurred()) + // Wait a short amount of time to let the cluster see that the primary and primary satellite is down. + time.Sleep(5 * time.Second) + + remote := fdbCluster.GetRemote() + // Ensure the cluster is unavailable. + Eventually(func() bool { + return remote.GetStatus().Client.DatabaseStatus.Available + }).WithTimeout(2 * time.Minute).WithPolling(1 * time.Second).Should(BeFalse()) + }) + + AfterEach(func() { + log.Println("Recreate cluster") + // Delete the broken cluster. + fdbCluster.Delete() + // Recreate the cluster to make sure the next tests can proceed + fdbCluster = factory.CreateFdbHaCluster(clusterConfig, clusterOptions...) + }) + + It("should recover the coordinators", func() { + remote := fdbCluster.GetRemote() + // Pick one operator pod and execute the recovery command + operatorPod := factory.RandomPickOnePod(factory.GetOperatorPods(remote.Namespace()).Items) + log.Println("operatorPod:", operatorPod.Name) + stdout, stderr, err := factory.ExecuteCmdOnPod(context.Background(), &operatorPod, "manager", fmt.Sprintf("kubectl-fdb -n %s recover-multi-region-cluster --version-check=false --wait=false %s", remote.Namespace(), remote.Name()), false) + log.Println("stdout:", stdout, "stderr:", stderr) + Expect(err).NotTo(HaveOccurred()) + + // Ensure the cluster is available again. + Eventually(func() bool { + return remote.GetStatus().Client.DatabaseStatus.Available + }).WithTimeout(2 * time.Minute).WithPolling(1 * time.Second).Should(BeTrue()) + }) + }) }) diff --git a/internal/pod_client.go b/internal/pod_client.go index 1e0aa8912..c43ab5dfe 100644 --- a/internal/pod_client.go +++ b/internal/pod_client.go @@ -105,7 +105,7 @@ func NewFdbPodClient(cluster *fdbv1beta2.FoundationDBCluster, pod *corev1.Pod, l } } - useTLS := podHasSidecarTLS(pod) + useTLS := PodHasSidecarTLS(pod) var tlsConfig = &tls.Config{} if useTLS { @@ -375,22 +375,6 @@ func (client *realFdbPodAnnotationClient) IsPresent(_ string) (bool, error) { return true, nil } -// podHasSidecarTLS determines whether a pod currently has TLS enabled for the -// sidecar process. -func podHasSidecarTLS(pod *corev1.Pod) bool { - for _, container := range pod.Spec.Containers { - if container.Name == fdbv1beta2.SidecarContainerName { - for _, arg := range container.Args { - if arg == "--tls" { - return true - } - } - } - } - - return false -} - // GetImageType determines whether a pod is using the unified or the split image. func GetImageType(pod *corev1.Pod) fdbv1beta2.ImageType { return GetImageTypeFromAnnotation(pod.Annotations) diff --git a/internal/pod_client_test.go b/internal/pod_client_test.go index 8b03e7865..a1d16d72f 100644 --- a/internal/pod_client_test.go +++ b/internal/pod_client_test.go @@ -48,7 +48,7 @@ var _ = Describe("pod_client", func() { It("should not have TLS sidecar TLS", func() { pod, err := GetPod(cluster, GetProcessGroup(cluster, fdbv1beta2.ProcessClassStorage, 1)) Expect(err).NotTo(HaveOccurred()) - Expect(podHasSidecarTLS(pod)).To(BeFalse()) + Expect(PodHasSidecarTLS(pod)).To(BeFalse()) }) }) @@ -60,7 +60,7 @@ var _ = Describe("pod_client", func() { It("should have TLS sidecar TLS", func() { pod, err := GetPod(cluster, GetProcessGroup(cluster, fdbv1beta2.ProcessClassStorage, 1)) Expect(err).NotTo(HaveOccurred()) - Expect(podHasSidecarTLS(pod)).To(BeTrue()) + Expect(PodHasSidecarTLS(pod)).To(BeTrue()) }) }) diff --git a/internal/pod_helper.go b/internal/pod_helper.go index 952e03983..aef3558f8 100644 --- a/internal/pod_helper.go +++ b/internal/pod_helper.go @@ -236,3 +236,19 @@ func GetPublicIPSource(pod *corev1.Pod) (fdbv1beta2.PublicIPSource, error) { } return fdbv1beta2.PublicIPSource(source), nil } + +// PodHasSidecarTLS determines whether a pod currently has TLS enabled for the sidecar process. +// This method should only be used for split images. +func PodHasSidecarTLS(pod *corev1.Pod) bool { + for _, container := range pod.Spec.Containers { + if container.Name == fdbv1beta2.SidecarContainerName { + for _, arg := range container.Args { + if arg == "--tls" { + return true + } + } + } + } + + return false +} diff --git a/kubectl-fdb/cmd/analyze.go b/kubectl-fdb/cmd/analyze.go index e7567f818..7d7532cd7 100644 --- a/kubectl-fdb/cmd/analyze.go +++ b/kubectl-fdb/cmd/analyze.go @@ -466,7 +466,7 @@ func filterDeletePods(replacements []string, killPods []corev1.Pod) []corev1.Pod } func getStatus(ctx context.Context, kubeClient client.Client, restConfig *rest.Config, pod *corev1.Pod) (*fdbv1beta2.FoundationDBStatus, error) { - stdout, stderr, err := kubeHelper.ExecuteCommandOnPod(ctx, kubeClient, restConfig, pod, fdbv1beta2.MainContainerName, "fdbcli --exec 'status json'", false) + stdout, stderr, err := kubeHelper.ExecuteCommandOnPod(ctx, kubeClient, restConfig, pod, fdbv1beta2.MainContainerName, "fdbcli --timeout=40 --exec 'status json'", false) if err != nil { return nil, fmt.Errorf("error getting status: %s, %w", stderr, err) } diff --git a/kubectl-fdb/cmd/exec.go b/kubectl-fdb/cmd/exec.go index ecd3278ca..5fe180821 100644 --- a/kubectl-fdb/cmd/exec.go +++ b/kubectl-fdb/cmd/exec.go @@ -21,9 +21,7 @@ package cmd import ( - "context" "log" - "strings" fdbv1beta2 "github.com/FoundationDB/fdb-kubernetes-operator/api/v1beta2" kubeHelper "github.com/FoundationDB/fdb-kubernetes-operator/internal/kubernetes" @@ -66,7 +64,7 @@ func newExecCmd(streams genericclioptions.IOStreams) *cobra.Command { return err } - return runExec(cmd.Context(), kubeClient, cluster, config, args) + return runExec(cmd, kubeClient, cluster, config, args) }, Example: ` # Open a shell. @@ -91,8 +89,8 @@ func newExecCmd(streams genericclioptions.IOStreams) *cobra.Command { return cmd } -func runExec(ctx context.Context, kubeClient client.Client, cluster *fdbv1beta2.FoundationDBCluster, config *rest.Config, commandArgs []string) error { - pods, err := getRunningPodsForCluster(ctx, kubeClient, cluster) +func runExec(cmd *cobra.Command, kubeClient client.Client, cluster *fdbv1beta2.FoundationDBCluster, config *rest.Config, commandArgs []string) error { + pods, err := getRunningPodsForCluster(cmd.Context(), kubeClient, cluster) if err != nil { return err } @@ -102,10 +100,5 @@ func runExec(ctx context.Context, kubeClient client.Client, cluster *fdbv1beta2. return err } - _, stderr, err := kubeHelper.ExecuteCommandOnPod(ctx, kubeClient, config, clientPod, fdbv1beta2.MainContainerName, strings.Join(commandArgs, " "), false) - if err != nil { - log.Println(stderr) - } - - return err + return kubeHelper.ExecuteCommandRaw(cmd.Context(), kubeClient, config, clientPod.Namespace, clientPod.Name, fdbv1beta2.MainContainerName, commandArgs, cmd.InOrStdin(), cmd.OutOrStdout(), cmd.OutOrStderr(), true) } diff --git a/kubectl-fdb/cmd/exec_test.go b/kubectl-fdb/cmd/exec_test.go index a930375dc..08440a665 100644 --- a/kubectl-fdb/cmd/exec_test.go +++ b/kubectl-fdb/cmd/exec_test.go @@ -21,7 +21,9 @@ package cmd import ( + "bytes" "context" + "k8s.io/cli-runtime/pkg/genericclioptions" fdbv1beta2 "github.com/FoundationDB/fdb-kubernetes-operator/api/v1beta2" . "github.com/onsi/ginkgo/v2" @@ -56,7 +58,12 @@ var _ = Describe("[plugin] exec command", func() { DescribeTable("should execute the provided command", func(input testCase) { - Expect(runExec(context.Background(), k8sClient, cluster, &rest.Config{}, input.Command)).NotTo(HaveOccurred()) + outBuffer := bytes.Buffer{} + errBuffer := bytes.Buffer{} + inBuffer := bytes.Buffer{} + + rootCmd := NewRootCmd(genericclioptions.IOStreams{In: &inBuffer, Out: &outBuffer, ErrOut: &errBuffer}, &MockVersionChecker{}) + Expect(runExec(rootCmd, k8sClient, cluster, &rest.Config{}, input.Command)).NotTo(HaveOccurred()) }, Entry("Exec into instance with valid pod", testCase{ diff --git a/kubectl-fdb/cmd/k8s_client.go b/kubectl-fdb/cmd/k8s_client.go index 553622c9f..2f3dad243 100644 --- a/kubectl-fdb/cmd/k8s_client.go +++ b/kubectl-fdb/cmd/k8s_client.go @@ -24,13 +24,13 @@ import ( "context" "errors" "fmt" - "io" - "math/rand" - "strings" - fdbv1beta1 "github.com/FoundationDB/fdb-kubernetes-operator/api/v1beta1" fdbv1beta2 "github.com/FoundationDB/fdb-kubernetes-operator/api/v1beta2" "github.com/FoundationDB/fdb-kubernetes-operator/internal" + "io" + "k8s.io/client-go/rest" + "math/rand" + "strings" "github.com/go-logr/logr" "github.com/spf13/cobra" @@ -56,6 +56,15 @@ func getKubeClient(ctx context.Context, o *fdbBOptions) (client.Client, error) { return nil, err } + namespace, err := getNamespace(*o.configFlags.Namespace) + if err != nil { + return nil, err + } + + return setupKubeClient(ctx, config, namespace) +} + +func setupKubeClient(ctx context.Context, config *rest.Config, namespace string) (client.Client, error) { scheme := runtime.NewScheme() utilruntime.Must(clientgoscheme.AddToScheme(scheme)) utilruntime.Must(fdbv1beta1.AddToScheme(scheme)) @@ -74,11 +83,6 @@ func getKubeClient(ctx context.Context, o *fdbBOptions) (client.Client, error) { return nil, err } - namespace, err := getNamespace(*o.configFlags.Namespace) - if err != nil { - return nil, err - } - cacheBuilder := cache.MultiNamespacedCacheBuilder([]string{namespace}) internalCache, err := cacheBuilder(config, cache.Options{ Scheme: scheme, @@ -582,3 +586,34 @@ func getProcessGroupsByCluster(cmd *cobra.Command, kubeClient client.Client, opt cluster: processGroupIDs, }, nil } + +func setSkipReconciliation(ctx context.Context, kubeClient client.Client, cluster *fdbv1beta2.FoundationDBCluster, skip bool) error { + patch := client.MergeFrom(cluster.DeepCopy()) + cluster.Spec.Skip = skip + return kubeClient.Patch(ctx, cluster, patch) +} + +func updateConnectionString(ctx context.Context, kubeClient client.Client, cluster *fdbv1beta2.FoundationDBCluster, connectionString string) error { + patch := client.MergeFrom(cluster.DeepCopy()) + cluster.Status.ConnectionString = connectionString + err := kubeClient.Status().Patch(ctx, cluster, patch) + if err != nil { + return err + } + + // In addition to the FoundationDBCluster also update the ConfigMap. + cm := &corev1.ConfigMap{} + err = kubeClient.Get(ctx, client.ObjectKey{Namespace: cluster.Namespace, Name: cluster.Name + "-config"}, cm) + if err != nil { + return err + } + + cm.Data[fdbv1beta2.ClusterFileKey] = connectionString + return kubeClient.Update(ctx, cm) +} + +func updateDatabaseConfiguration(ctx context.Context, kubeClient client.Client, cluster *fdbv1beta2.FoundationDBCluster, configuration fdbv1beta2.DatabaseConfiguration) error { + patch := client.MergeFrom(cluster.DeepCopy()) + cluster.Spec.DatabaseConfiguration = configuration + return kubeClient.Patch(ctx, cluster, patch) +} diff --git a/kubectl-fdb/cmd/recover_multi_region_cluster.go b/kubectl-fdb/cmd/recover_multi_region_cluster.go new file mode 100644 index 000000000..7aec77610 --- /dev/null +++ b/kubectl-fdb/cmd/recover_multi_region_cluster.go @@ -0,0 +1,491 @@ +/* + * recover_multi_region_cluster.go + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2021-2024 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "context" + "fmt" + "os" + "path" + "strings" + "time" + + fdbv1beta2 "github.com/FoundationDB/fdb-kubernetes-operator/api/v1beta2" + "github.com/FoundationDB/fdb-kubernetes-operator/internal" + kubeHelper "github.com/FoundationDB/fdb-kubernetes-operator/internal/kubernetes" + "github.com/spf13/cobra" + corev1 "k8s.io/api/core/v1" + "k8s.io/cli-runtime/pkg/genericclioptions" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// recoverMultiRegionClusterOpts struct to pass down all args to the actual runner. +type recoverMultiRegionClusterOpts struct { + client client.Client + config *rest.Config + clusterName string + namespace string +} + +func newRecoverMultiRegionClusterCmd(streams genericclioptions.IOStreams) *cobra.Command { + o := newFDBOptions(streams) + + cmd := &cobra.Command{ + Use: "recover-multi-region-cluster", + Short: "Recover a multi-region cluster if a majority of coordinators is lost permanently", + Long: "Recover a multi-region cluster if a majority of coordinators is lost permanently", + RunE: func(cmd *cobra.Command, args []string) error { + wait, err := cmd.Root().Flags().GetBool("wait") + if err != nil { + return err + } + + if len(args) != 1 { + return fmt.Errorf("exactly one cluster name must be specified, provided args: %v", args) + } + + clusterName := args[0] + + kubeClient, err := getKubeClient(cmd.Context(), o) + if err != nil { + return err + } + + namespace, err := getNamespace(*o.configFlags.Namespace) + if err != nil { + return err + } + + config, err := o.configFlags.ToRESTConfig() + if err != nil { + return err + } + + if wait { + confirmed := confirmAction(fmt.Sprintf("WARNING:\nThe cluster: %s/%s will be force recovered.\nOnly perform those steps if you are unable to recover the coordinator state.\nPerforming this action can lead to data loss.", + namespace, clusterName)) + if !confirmed { + return fmt.Errorf("aborted recover multi-region aciton") + } + + confirmed = confirmAction("WARNING:\nIf this is a multi-region cluster, or is spread across different namespaces/Kubernetes clusters.\nEnsure that all Pods of this FDB cluster: %s in the other namespaces/Kubernetes clusters are deleted and shutdown.") + if !confirmed { + return fmt.Errorf("aborted recover multi-region aciton") + } + } + + return recoverMultiRegionCluster(cmd, + recoverMultiRegionClusterOpts{ + client: kubeClient, + config: config, + clusterName: clusterName, + namespace: namespace, + }) + }, + Example: ` +# Recover the multi-region cluster "sample-cluster-1" in the current namespace +kubectl fdb recover-multi-region-cluster sample-cluster-1 + +# Recover the multi-region cluster "sample-cluster-1" in the "testing" namespace +kubectl fdb recover-multi-region-cluster -n testing sample-cluster-1 +`, + } + cmd.SetOut(o.Out) + cmd.SetErr(o.ErrOut) + cmd.SetIn(o.In) + + o.configFlags.AddFlags(cmd.Flags()) + + return cmd +} + +// recoverMultiRegionCluster will forcefully recover a multi-region cluster if a majority of coordinators are lost. +// Performing this action can result in data loss. +func recoverMultiRegionCluster(cmd *cobra.Command, opts recoverMultiRegionClusterOpts) error { + cluster := &fdbv1beta2.FoundationDBCluster{} + err := opts.client.Get(cmd.Context(), client.ObjectKey{Name: opts.clusterName, Namespace: opts.namespace}, cluster) + if err != nil { + return err + } + + err = checkIfClusterIsUnavailableAndMajorityOfCoordinatorsAreUnreachable(cmd, opts.client, opts.config, cluster) + if err != nil { + return err + } + + // Skip the cluster, make sure the operator is not taking any action on the cluster. + err = setSkipReconciliation(cmd.Context(), opts.client, cluster, true) + if err != nil { + return err + } + + // Fetch the last connection string from the `FoundationDBCluster` status, e.g. `kubectl get fdb ${cluster} -o jsonpath='{ .status.connectionString }'`. + lastConnectionString := cluster.Status.ConnectionString + lastConnectionStringParts := strings.Split(lastConnectionString, "@") + addresses := strings.Split(lastConnectionStringParts[1], ",") + + cmd.Println("current connection string", lastConnectionString) + + var useTLS bool + coordinators := map[string]fdbv1beta2.ProcessAddress{} + for _, addr := range addresses { + parsed, parseErr := fdbv1beta2.ParseProcessAddress(addr) + if parseErr != nil { + return parseErr + } + + cmd.Println("found coordinator", parsed.String()) + coordinators[parsed.MachineAddress()] = parsed + // If the tls flag is present we assume that the coordinators should make use of TLS. + _, useTLS = parsed.Flags["tls"] + } + + cmd.Println("current coordinators", coordinators, "useTLS", useTLS) + // Fetch all Pods and coordinators for the remote and remote satellite. + runningCoordinators := map[string]fdbv1beta2.None{} + newCoordinators := make([]fdbv1beta2.ProcessAddress, 0, 5) + processCounts, err := cluster.GetProcessCountsWithDefaults() + if err != nil { + return err + } + candidates := make([]*corev1.Pod, 0, processCounts.Total()) + + pods, err := getRunningPodsForCluster(cmd.Context(), opts.client, cluster) + if err != nil { + return err + } + + // Find a running coordinator to copy the coordinator files from. + var runningCoordinator *corev1.Pod + for _, pod := range pods.Items { + addr, parseErr := fdbv1beta2.ParseProcessAddress(pod.Status.PodIP) + if parseErr != nil { + return parseErr + } + + loopPod := pod + if coordinatorAddr, ok := coordinators[addr.MachineAddress()]; ok { + cmd.Println("Found coordinator for cluster", pod.Name, "address", addr.MachineAddress()) + runningCoordinators[addr.MachineAddress()] = fdbv1beta2.None{} + newCoordinators = append(newCoordinators, coordinatorAddr) + + runningCoordinator = &loopPod + continue + } + + if !internal.GetProcessClassFromMeta(cluster, pod.ObjectMeta).IsStateful() { + continue + } + + candidates = append(candidates, &loopPod) + } + + if runningCoordinator == nil { + return fmt.Errorf("could not find any running coordinator for this cluster") + } + + // Drop the multi-region setup if present. + newDatabaseConfiguration := cluster.Spec.DatabaseConfiguration.DeepCopy() + // Drop the multi-region configuration. + newDatabaseConfiguration.UsableRegions = 1 + newDatabaseConfiguration.Regions = []fdbv1beta2.Region{ + { + DataCenters: []fdbv1beta2.DataCenter{ + { + ID: cluster.Spec.DataCenter, + }, + }, + }, + } + cmd.Println("Update the database configuration to single region configuration") + err = updateDatabaseConfiguration(cmd.Context(), opts.client, cluster, *newDatabaseConfiguration) + if err != nil { + return err + } + + // Pick 5 new coordinators. + needsUpload := make([]*corev1.Pod, 0, cluster.DesiredCoordinatorCount()) + for len(newCoordinators) < cluster.DesiredCoordinatorCount() { + cmd.Println("Current coordinators:", len(newCoordinators)) + candidate := candidates[len(newCoordinators)] + addr, parseErr := fdbv1beta2.ParseProcessAddress(candidate.Status.PodIP) + if parseErr != nil { + return parseErr + } + cmd.Println("Adding pod as new coordinators:", candidate.Name) + if useTLS { + addr.Port = 4500 + addr.Flags = map[string]bool{"tls": true} + } else { + addr.Port = 4501 + } + newCoordinators = append(newCoordinators, addr) + needsUpload = append(needsUpload, candidate) + } + + // If at least one coordinator needs to get the files uploaded, we perform the download and upload for the coordinators. + if len(needsUpload) > 0 { + // Copy the coordinator state from one of the running coordinators to your local machine: + coordinatorFiles := []string{"coordination-0.fdq", "coordination-1.fdq"} + tmpCoordinatorFiles := make([]string, 2) + tmpDir := os.TempDir() + for idx, coordinatorFile := range coordinatorFiles { + tmpCoordinatorFiles[idx] = path.Join(tmpDir, coordinatorFile) + } + + cmd.Println("tmpCoordinatorFiles", tmpCoordinatorFiles, "checking the location of the coordination-0.fdq in Pod", runningCoordinator.Name) + stdout, stderr, err := kubeHelper.ExecuteCommandOnPod(context.Background(), opts.client, opts.config, runningCoordinator, fdbv1beta2.MainContainerName, "find /var/fdb/data/ -type f -name 'coordination-0.fdq' -print -quit | head -n 1", false) + if err != nil { + cmd.Println(stderr) + return err + } + + lines := strings.Split(stdout, "\n") + if len(lines) == 0 { + return fmt.Errorf("no coordination file found in %s", runningCoordinator.Name) + } + + dataDir := path.Dir(strings.TrimSpace(lines[0])) + cmd.Println("dataDir:", dataDir) + for idx, coordinatorFile := range coordinatorFiles { + err = downloadCoordinatorFile(cmd, opts.client, opts.config, runningCoordinator, path.Join(dataDir, coordinatorFile), tmpCoordinatorFiles[idx]) + if err != nil { + return err + } + } + + for _, target := range needsUpload { + targetDataDir := getDataDir(dataDir, target, cluster) + + for idx, coordinatorFile := range coordinatorFiles { + err = uploadCoordinatorFile(cmd, opts.client, opts.config, target, tmpCoordinatorFiles[idx], path.Join(targetDataDir, coordinatorFile)) + if err != nil { + return err + } + } + } + } + + // Update the `ConfigMap` to contain the new connection string, the new connection string must contain the still existing coordinators and the new coordinators. The old entries must be removed. + var newConnectionString strings.Builder + newConnectionString.WriteString(lastConnectionStringParts[0]) + newConnectionString.WriteString("@") + for idx, coordinator := range newCoordinators { + newConnectionString.WriteString(coordinator.String()) + if idx == len(newCoordinators)-1 { + break + } + + newConnectionString.WriteString(",") + } + + newCS := newConnectionString.String() + cmd.Println("new connection string:", newCS) + err = updateConnectionString(cmd.Context(), opts.client, cluster, newCS) + if err != nil { + return err + } + + // Wait ~1 min until the `ConfigMap` is synced to all Pods, you can check the `/var/dynamic-conf/fdb.cluster` inside a Pod if you are unsure. + time.Sleep(2 * time.Minute) + + // If the split image is used we have to update the copied files by making a POST request against the sidecar API. + // In the unified image, this step is not required as the dynamic files are directly mounted in the main container. + // We are not deleting the Pods as the operator is set to skip the reconciliation and therefore the deleted Pods + // would not be recreated. + if !cluster.UseUnifiedImage() { + cmd.Println("The cluster uses the split image, the plugin will update the copied files") + for _, pod := range pods.Items { + loopPod := pod + + command := []string{"/bin/bash", "-c"} + + var curlStr strings.Builder + curlStr.WriteString("curl -X POST") + if internal.PodHasSidecarTLS(&loopPod) { + curlStr.WriteString(" --cacert ${FDB_TLS_CA_FILE} --cert ${FDB_TLS_CERTIFICATE_FILE} --key ${FDB_TLS_KEY_FILE} -k https://") + } else { + curlStr.WriteString(" http://") + } + + curlStr.WriteString(loopPod.Status.PodIP) + curlStr.WriteString(":8080/copy_files > /dev/null") + + command = append(command, curlStr.String()) + + err = kubeHelper.ExecuteCommandRaw(cmd.Context(), opts.client, opts.config, runningCoordinator.Namespace, runningCoordinator.Name, fdbv1beta2.MainContainerName, command, nil, cmd.OutOrStdout(), cmd.OutOrStderr(), false) + if err != nil { + return err + } + } + } + + cmd.Println("Killing fdbserver processes") + // Now all Pods must be restarted and the previous local cluster file must be deleted to make sure the fdbserver is picking the connection string from the seed cluster file (`/var/dynamic-conf/fdb.cluster`). + err = restartFdbserverInCluster(cmd.Context(), opts.client, opts.config, cluster) + if err != nil { + return err + } + + // Wait until all fdbservers have started again. + time.Sleep(1 * time.Minute) + + command := []string{"fdbcli", "--exec", fmt.Sprintf("force_recovery_with_data_loss %s", cluster.Spec.DataCenter)} + // Now you can exec into a container and use `fdbcli` to connect to the cluster. + // If you use a multi-region cluster you have to issue `force_recovery_with_data_loss` + cmd.Println("Triggering force recovery with command:", command) + err = kubeHelper.ExecuteCommandRaw(cmd.Context(), opts.client, opts.config, runningCoordinator.Namespace, runningCoordinator.Name, fdbv1beta2.MainContainerName, command, nil, cmd.OutOrStdout(), cmd.OutOrStderr(), false) + if err != nil { + return err + } + + // Now you can set `spec.Skip = false` to let the operator take over again. + // Skip the cluster, make sure the operator is not taking any action on the cluster. + err = setSkipReconciliation(cmd.Context(), opts.client, cluster, false) + if err != nil { + return err + } + + return nil +} + +// getDataDir will return the target data directory to upload the coordinator files to. The directory can be different, depending +// on the used image type and if more than one process should be running inside the Pod. +func getDataDir(dataDir string, pod *corev1.Pod, cluster *fdbv1beta2.FoundationDBCluster) string { + baseDir := dataDir + // If the dataDir has a suffix for the process we remove it. + if dataDir != "/var/fdb/data" { + baseDir = path.Dir(dataDir) + } + + // If the unified image is used we can simply return /var/fdb/data/1, as the unified image will always add the process + // directory, even if only a single process is running inside the Pod. + if cluster.UseUnifiedImage() { + return path.Join(baseDir, "/1") + } + + // In this path we use the split image, so the process directory is only added if more than one process should be running + processClass := internal.GetProcessClassFromMeta(cluster, pod.ObjectMeta) + + if processClass.IsLogProcess() && cluster.GetLogServersPerPod() > 1 { + return path.Join(baseDir, "/1") + } + + if processClass == fdbv1beta2.ProcessClassStorage && cluster.GetStorageServersPerPod() > 1 { + return path.Join(baseDir, "/1") + } + + // This is the default case if we are running one process per Pod for this storage class and using the split image. + return baseDir +} + +func downloadCoordinatorFile(cmd *cobra.Command, kubeClient client.Client, config *rest.Config, pod *corev1.Pod, src string, dst string) error { + tmpCoordinatorFile, err := os.OpenFile(dst, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0600) + if err != nil { + return err + } + + defer func() { + _ = tmpCoordinatorFile.Close() + }() + + cmd.Println("Download files, target:", dst, "source", src, "pod", pod.Name, "namespace", pod.Namespace) + err = kubeHelper.DownloadFile(cmd.Context(), kubeClient, config, pod, fdbv1beta2.MainContainerName, src, tmpCoordinatorFile) + if err != nil { + return err + } + + fileInfo, err := os.Stat(tmpCoordinatorFile.Name()) + if err != nil { + return err + } + + if fileInfo.Size() <= 0 { + return fmt.Errorf("file %s is empty", tmpCoordinatorFile.Name()) + } + + return nil +} + +func uploadCoordinatorFile(cmd *cobra.Command, kubeClient client.Client, config *rest.Config, pod *corev1.Pod, src string, dst string) error { + tmpCoordinatorFile, err := os.OpenFile(src, os.O_RDONLY, 0600) + if err != nil { + return err + } + + defer func() { + _ = tmpCoordinatorFile.Close() + }() + + cmd.Println("Upload files, target:", dst, "source", src, "pod", pod.Name, "namespace", pod.Namespace) + + return kubeHelper.UploadFile(cmd.Context(), kubeClient, config, pod, fdbv1beta2.MainContainerName, tmpCoordinatorFile, dst) +} + +func restartFdbserverInCluster(ctx context.Context, kubeClient client.Client, config *rest.Config, cluster *fdbv1beta2.FoundationDBCluster) error { + pods, err := getRunningPodsForCluster(ctx, kubeClient, cluster) + if err != nil { + return err + } + + // Now all Pods must be restarted and the previous local cluster file must be deleted to make sure the fdbserver is picking the connection string from the seed cluster file (`/var/dynamic-conf/fdb.cluster`). + for _, pod := range pods.Items { + _, _, err := kubeHelper.ExecuteCommand(context.Background(), kubeClient, config, pod.Namespace, pod.Name, fdbv1beta2.MainContainerName, "pkill fdbserver && rm -f /var/fdb/data/fdb.cluster && pkill fdbserver || true", false) + if err != nil { + return err + } + } + + return nil +} + +func checkIfClusterIsUnavailableAndMajorityOfCoordinatorsAreUnreachable(cmd *cobra.Command, kubeClient client.Client, config *rest.Config, cluster *fdbv1beta2.FoundationDBCluster) error { + pods, err := getRunningPodsForCluster(cmd.Context(), kubeClient, cluster) + if err != nil { + return err + } + + clientPod, err := kubeHelper.PickRandomPod(pods) + if err != nil { + return err + } + + cmd.Println("Getting the status from:", clientPod.Name) + status, err := getStatus(cmd.Context(), kubeClient, config, clientPod) + if err != nil { + return err + } + + if status.Client.DatabaseStatus.Available { + return fmt.Errorf("cluster is available, will abort any further actions") + } + + if status.Client.DatabaseStatus.Healthy { + return fmt.Errorf("cluster is healthy, will abort any further actions") + } + + if status.Client.Coordinators.QuorumReachable { + return fmt.Errorf("quorum of coordinators are reachable, will abort any further actions") + } + + return nil +} diff --git a/kubectl-fdb/cmd/recover_multi_region_cluster_test.go b/kubectl-fdb/cmd/recover_multi_region_cluster_test.go new file mode 100644 index 000000000..841afaeff --- /dev/null +++ b/kubectl-fdb/cmd/recover_multi_region_cluster_test.go @@ -0,0 +1,127 @@ +/* + * recover_multi_region_cluster_test.go + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2021-2024 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + fdbv1beta2 "github.com/FoundationDB/fdb-kubernetes-operator/api/v1beta2" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var _ = Describe("[plugin] running the recover multi-region cluster command", func() { + var imageTypeUnified = fdbv1beta2.ImageTypeUnified + type dataDirTest struct { + input string + pod *corev1.Pod + cluster *fdbv1beta2.FoundationDBCluster + expected string + } + + DescribeTable("when getting the data dir for uploading files", func(test dataDirTest) { + Expect(getDataDir(test.input, test.pod, test.cluster)).To(Equal(test.expected)) + }, + Entry("data dir is already correct", + dataDirTest{ + input: "/var/fdb/data", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + fdbv1beta2.FDBProcessClassLabel: string(fdbv1beta2.ProcessClassStorage), + }, + }, + }, + cluster: &fdbv1beta2.FoundationDBCluster{}, + expected: "/var/fdb/data", + }, + ), + Entry("data dir has process directory and target pod has single process", + dataDirTest{ + input: "/var/fdb/data/2", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + fdbv1beta2.FDBProcessClassLabel: string(fdbv1beta2.ProcessClassStorage), + }, + }, + }, + cluster: &fdbv1beta2.FoundationDBCluster{}, + expected: "/var/fdb/data", + }, + ), + Entry("data dir has process directory and target pod has multiple processes", + dataDirTest{ + input: "/var/fdb/data/2", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + fdbv1beta2.FDBProcessClassLabel: string(fdbv1beta2.ProcessClassStorage), + }, + }, + }, + cluster: &fdbv1beta2.FoundationDBCluster{ + Spec: fdbv1beta2.FoundationDBClusterSpec{ + StorageServersPerPod: 2, + }, + }, + expected: "/var/fdb/data/1", + }, + ), + Entry("data dir has process directory and target pod has single process with unified image", + dataDirTest{ + input: "/var/fdb/data/2", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + fdbv1beta2.FDBProcessClassLabel: string(fdbv1beta2.ProcessClassStorage), + }, + }, + }, + cluster: &fdbv1beta2.FoundationDBCluster{ + Spec: fdbv1beta2.FoundationDBClusterSpec{ + ImageType: &imageTypeUnified, + }, + }, + expected: "/var/fdb/data/1", + }, + ), + Entry("data dir has process directory and target pod has multiple processes with unified image", + dataDirTest{ + input: "/var/fdb/data/2", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + fdbv1beta2.FDBProcessClassLabel: string(fdbv1beta2.ProcessClassStorage), + }, + }, + }, + cluster: &fdbv1beta2.FoundationDBCluster{ + Spec: fdbv1beta2.FoundationDBClusterSpec{ + ImageType: &imageTypeUnified, + StorageServersPerPod: 2, + }, + }, + expected: "/var/fdb/data/1", + }, + ), + ) +}) diff --git a/kubectl-fdb/cmd/root.go b/kubectl-fdb/cmd/root.go index 18731de80..62f3124a7 100644 --- a/kubectl-fdb/cmd/root.go +++ b/kubectl-fdb/cmd/root.go @@ -23,17 +23,14 @@ package cmd import ( "bufio" "fmt" - "log" - "math/rand" - "os" - "strings" - "time" - fdbv1beta2 "github.com/FoundationDB/fdb-kubernetes-operator/api/v1beta2" "github.com/fatih/color" "github.com/spf13/cobra" "github.com/spf13/viper" "k8s.io/cli-runtime/pkg/genericclioptions" + "log" + "os" + "strings" ) // fdbBOptions provides information required to run different @@ -53,8 +50,6 @@ func newFDBOptions(streams genericclioptions.IOStreams) *fdbBOptions { // NewRootCmd provides a cobra command wrapping FDB actions func NewRootCmd(streams genericclioptions.IOStreams, pluginVersionChecker VersionChecker) *cobra.Command { - rand.Seed(time.Now().Unix()) - o := newFDBOptions(streams) cmd := &cobra.Command{ @@ -96,6 +91,7 @@ func NewRootCmd(streams genericclioptions.IOStreams, pluginVersionChecker Versio newFixCoordinatorIPsCmd(streams), newGetCmd(streams), newBuggifyCmd(streams), + newRecoverMultiRegionClusterCmd(streams), ) return cmd