Skip to content

Commit

Permalink
Add recovery plugin clean (#2127)
Browse files Browse the repository at this point in the history
* Add support for multi-region recovery in kubectl-fdb
  • Loading branch information
johscheuer committed Sep 9, 2024
1 parent f37605d commit cdd05a5
Show file tree
Hide file tree
Showing 14 changed files with 794 additions and 303 deletions.
3 changes: 3 additions & 0 deletions docs/manual/operations.md
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,9 @@ If the cluster is a multi-region cluster, perform this step for all running regi
- Now you can set `spec.Skip = false` to let the operator take over again.
- Depending on the state of the multi-region cluster, you probably want to change the desired database configuration to drop ha.

The [kubectl-fdb plugin](../../kubectl-fdb/Readme.md) provides a `recover-multi-region-cluster` command that can be used to automatically recover a cluster with the above steps.
The command has some additional safety checks, to ensure the steps are only performed on a cluster that is unhealthy and the majority of coordinators are unreachable.

## Next

You can continue on to the [next section](scaling.md) or go back to the [table of contents](index.md).
1 change: 1 addition & 0 deletions e2e/fixtures/fdb_operator_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ rules:
resources:
- pods/exec
verbs:
- get
- create
- apiGroups:
- apps.foundationdb.org
Expand Down
242 changes: 4 additions & 238 deletions e2e/test_operator_ha/operator_ha_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,8 @@ This cluster will be used for all tests.
*/

import (
"context"
"fmt"
"log"
"os"
"path"
"strconv"
"strings"
"time"

fdbv1beta2 "github.com/FoundationDB/fdb-kubernetes-operator/api/v1beta2"
Expand All @@ -52,11 +47,9 @@ import (
)

var (
factory *fixtures.Factory
fdbCluster *fixtures.HaFdbCluster
testOptions *fixtures.FactoryOptions
clusterConfig *fixtures.ClusterConfig
clusterOptions []fixtures.ClusterOption
factory *fixtures.Factory
fdbCluster *fixtures.HaFdbCluster
testOptions *fixtures.FactoryOptions
)

func init() {
Expand All @@ -65,9 +58,7 @@ func init() {

var _ = BeforeSuite(func() {
factory = fixtures.CreateFactory(testOptions)
clusterOptions = factory.GetClusterOptions()
clusterConfig = fixtures.DefaultClusterConfigWithHaMode(fixtures.HaFourZoneSingleSat, false)
fdbCluster = factory.CreateFdbHaCluster(clusterConfig, clusterOptions...)
fdbCluster = factory.CreateFdbHaCluster(fixtures.DefaultClusterConfigWithHaMode(fixtures.HaFourZoneSingleSat, false), factory.GetClusterOptions()...)

// Load some data into the cluster.
factory.CreateDataLoaderIfAbsent(fdbCluster.GetPrimary())
Expand Down Expand Up @@ -220,229 +211,4 @@ var _ = Describe("Operator HA tests", Label("e2e", "pr"), func() {
}).WithTimeout(10 * time.Minute).WithPolling(2 * time.Second).Should(BeNumerically(">=", desiredRunningPods))
})
})

When("all Pods in the primary and primary satellite are down", func() {
BeforeEach(func() {
// This tests is a destructive test where the cluster will stop working for some period.
availabilityCheck = false
primary := fdbCluster.GetPrimary()
primary.SetSkipReconciliation(true)

primarySatellite := fdbCluster.GetPrimarySatellite()
primarySatellite.SetSkipReconciliation(true)

primaryPods := primary.GetPods()
for _, pod := range primaryPods.Items {
factory.DeletePod(&pod)
}

primarySatellitePods := primarySatellite.GetPods()
for _, pod := range primarySatellitePods.Items {
factory.DeletePod(&pod)
}

// Wait a short amount of time to let the cluster see that the primary and primary satellite is down.
time.Sleep(5 * time.Second)
})

AfterEach(func() {
// Delete the broken cluster.
fdbCluster.Delete()
// Recreate the cluster to make sure the next tests can proceed
fdbCluster = factory.CreateFdbHaCluster(clusterConfig, clusterOptions...)
// Load some data into the cluster.
factory.CreateDataLoaderIfAbsent(fdbCluster.GetPrimary())
})

It("should recover the coordinators", func() {
// Set all the `FoundationDBCluster` resources for this FDB cluster to `spec.Skip = true` to make sure the operator is not changing the manual changed state.
remote := fdbCluster.GetRemote()
remote.SetSkipReconciliation(true)
remoteSatellite := fdbCluster.GetRemoteSatellite()
remoteSatellite.SetSkipReconciliation(true)

// Fetch the last connection string from the `FoundationDBCluster` status, e.g. `kubectl get fdb ${cluster} -o jsonpath='{ .status.connectionString }'`.
lastConnectionString := remote.GetCluster().Status.ConnectionString
lastConnectionStringParts := strings.Split(lastConnectionString, "@")
addresses := strings.Split(lastConnectionStringParts[1], ",")
// Since this is a multi-region cluster, we expect 9 coordinators.
Expect(addresses).To(HaveLen(9))

log.Println("lastConnectionString", lastConnectionString)

var useTLS bool
coordinators := map[string]fdbv1beta2.ProcessAddress{}
for _, addr := range addresses {
parsed, err := fdbv1beta2.ParseProcessAddress(addr)
Expect(err).NotTo(HaveOccurred())
log.Println("found coordinator", parsed.String())
coordinators[parsed.MachineAddress()] = parsed
// If the tls flag is present we assume that the coordinators should make use of TLS.
_, useTLS = parsed.Flags["tls"]
}

log.Println("coordinators", coordinators, "useTLS", useTLS)
runningCoordinators := map[string]fdbv1beta2.None{}
var runningCoordinator *corev1.Pod
newCoordinators := make([]fdbv1beta2.ProcessAddress, 0, 5)
remotePods := remote.GetPods()
candidates := make([]corev1.Pod, 0, len(remotePods.Items))
for _, pod := range remotePods.Items {
addr, err := fdbv1beta2.ParseProcessAddress(pod.Status.PodIP)
Expect(err).NotTo(HaveOccurred())
if coordinatorAddr, ok := coordinators[addr.MachineAddress()]; ok {
log.Println("Found coordinator for remote", pod.Name, "address", coordinatorAddr.String())
runningCoordinators[addr.MachineAddress()] = fdbv1beta2.None{}
newCoordinators = append(newCoordinators, coordinatorAddr)
if runningCoordinator == nil {
loopPod := pod
runningCoordinator = &loopPod
}
continue
}

if !fixtures.GetProcessClass(pod).IsTransaction() {
continue
}

candidates = append(candidates, pod)
}

remoteSatellitePods := remoteSatellite.GetPods()
for _, pod := range remoteSatellitePods.Items {
addr, err := fdbv1beta2.ParseProcessAddress(pod.Status.PodIP)
Expect(err).NotTo(HaveOccurred())
if coordinatorAddr, ok := coordinators[addr.MachineAddress()]; ok {
log.Println("Found coordinator for remote satellite", pod.Name, "address", addr.MachineAddress())
runningCoordinators[addr.MachineAddress()] = fdbv1beta2.None{}
newCoordinators = append(newCoordinators, coordinatorAddr)
if runningCoordinator == nil {
loopPod := pod
runningCoordinator = &loopPod
}
continue
}

if !fixtures.GetProcessClass(pod).IsTransaction() {
continue
}

candidates = append(candidates, pod)
}

// Pick 5 new coordinators.
needsUpload := make([]corev1.Pod, 0, 5)
idx := 0
for len(newCoordinators) < 5 {
fmt.Println("Current coordinators:", len(newCoordinators))
candidate := candidates[idx]
addr, err := fdbv1beta2.ParseProcessAddress(candidate.Status.PodIP)
Expect(err).NotTo(HaveOccurred())
fmt.Println("Adding pod as new coordinators:", candidate.Name)
if useTLS {
addr.Port = 4500
addr.Flags = map[string]bool{"tls": true}
} else {
addr.Port = 4501
}
newCoordinators = append(newCoordinators, addr)
needsUpload = append(needsUpload, candidate)
idx++
}

// Copy the coordinator state from one of the running coordinators to your local machine:
coordinatorFiles := []string{"coordination-0.fdq", "coordination-1.fdq"}
tmpCoordinatorFiles := make([]string, 2)
tmpDir := GinkgoT().TempDir()
for idx, coordinatorFile := range coordinatorFiles {
tmpCoordinatorFiles[idx] = path.Join(tmpDir, coordinatorFile)
}

log.Println("tmpCoordinatorFiles", tmpCoordinatorFiles)
stdout, stderr, err := factory.ExecuteCmdOnPod(context.Background(), runningCoordinator, fdbv1beta2.MainContainerName, "find /var/fdb/data/ -type f -name 'coordination-0.fdq'", true)
Expect(err).NotTo(HaveOccurred())
Expect(stderr).To(BeEmpty())

dataDir := path.Dir(strings.TrimSpace(stdout))
log.Println("find result:", stdout, ",dataDir", dataDir)
for idx, coordinatorFile := range coordinatorFiles {
tmpCoordinatorFile, err := os.OpenFile(tmpCoordinatorFiles[idx], os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0600)
Expect(err).NotTo(HaveOccurred())

log.Println("Download files, target:", tmpCoordinatorFiles[idx], "source", path.Join(dataDir, coordinatorFile), "pod", runningCoordinator.Name, "namespace", runningCoordinator.Namespace)
err = factory.DownloadFile(context.Background(), runningCoordinator, fdbv1beta2.MainContainerName, path.Join(dataDir, coordinatorFile), tmpCoordinatorFile)
Expect(err).NotTo(HaveOccurred())
Expect(tmpCoordinatorFile.Close()).NotTo(HaveOccurred())

fileInfo, err := os.Stat(tmpCoordinatorFiles[idx])
Expect(err).NotTo(HaveOccurred())
Expect(fileInfo.Size()).To(BeNumerically(">", 0))
}

for _, target := range needsUpload {
for idx, coordinatorFile := range coordinatorFiles {
tmpCoordinatorFile, err := os.OpenFile(tmpCoordinatorFiles[idx], os.O_RDONLY, 0600)
Expect(err).NotTo(HaveOccurred())

log.Println("Upload files, source:", tmpCoordinatorFile.Name(), "target", path.Join(dataDir, coordinatorFile), "pod", target.Name, "namespace", target.Namespace)
err = factory.UploadFile(context.Background(), &target, fdbv1beta2.MainContainerName, tmpCoordinatorFile, path.Join(dataDir, coordinatorFile))
Expect(err).NotTo(HaveOccurred())
Expect(tmpCoordinatorFile.Close()).NotTo(HaveOccurred())
}
}

// Update the `ConfigMap` to contain the new connection string, the new connection string must contain the still existing coordinators and the new coordinators. The old entries must be removed.
var newConnectionString strings.Builder
newConnectionString.WriteString(lastConnectionStringParts[0])
newConnectionString.WriteString("@")
for idx, coordinator := range newCoordinators {
newConnectionString.WriteString(coordinator.String())
if idx == len(newCoordinators)-1 {
break
}

newConnectionString.WriteString(",")
}

newCS := newConnectionString.String()
log.Println("new connection string:", newCS)
for _, cluster := range fdbCluster.GetAllClusters() {
cluster.UpdateConnectionString(newCS)
}

// Wait ~1 min until the `ConfigMap` is synced to all Pods, you can check the `/var/dynamic-conf/fdb.cluster` inside a Pod if you are unsure.
time.Sleep(2 * time.Minute)

log.Println("Kill fdbserver processes")

debugOutput := true
// Now all Pods must be restarted and the previous local cluster file must be deleted to make sure the fdbserver is picking the connection string from the seed cluster file (`/var/dynamic-conf/fdb.cluster`).
for _, pod := range remote.GetPods().Items {
_, _, err := factory.ExecuteCmd(context.Background(), pod.Namespace, pod.Name, fdbv1beta2.MainContainerName, "pkill fdbserver && rm -f /var/fdb/data/fdb.cluster && pkill fdbserver || true", debugOutput)
Expect(err).NotTo(HaveOccurred())
}

for _, pod := range remoteSatellite.GetPods().Items {
_, _, err := factory.ExecuteCmd(context.Background(), pod.Namespace, pod.Name, fdbv1beta2.MainContainerName, "pkill fdbserver && rm -f /var/fdb/data/fdb.cluster && pkill fdbserver || true", debugOutput)
Expect(err).NotTo(HaveOccurred())
}

log.Println("force recovery")
// Now you can exec into a container and use `fdbcli` to connect to the cluster.
// If you use a multi-region cluster you have to issue `force_recovery_with_data_loss`
_, _, err = remote.RunFdbCliCommandInOperatorWithoutRetry(fmt.Sprintf("force_recovery_with_data_loss %s", remote.GetCluster().Spec.DataCenter), true, 40)
Expect(err).NotTo(HaveOccurred())

// Now you can set `spec.Skip = false` to let the operator take over again.
remote.SetSkipReconciliation(false)
remoteSatellite.SetSkipReconciliation(false)

// Ensure the cluster is available again.
Eventually(func() bool {
return remote.GetStatus().Client.DatabaseStatus.Available
}).WithTimeout(2 * time.Minute).WithPolling(1 * time.Second).Should(BeTrue())

// TODO (johscheuer): Add additional testing for different cases.
})
})
})
Loading

0 comments on commit cdd05a5

Please sign in to comment.