Skip to content

Commit

Permalink
Make sure that the split image setup works and the copy files is trig…
Browse files Browse the repository at this point in the history
…gered by the plugin
  • Loading branch information
johscheuer committed Sep 9, 2024
1 parent bc83edf commit 0ec30b0
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 97 deletions.
10 changes: 4 additions & 6 deletions e2e/test_operator_plugin/operator_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ var _ = Describe("Operator Plugin", Label("e2e", "pr"), func() {
// Pick one operator pod and execute the kubectl version command to ensure that kubectl-fdb is present
// and can be executed.
operatorPod := factory.RandomPickOnePod(factory.GetOperatorPods(fdbCluster.GetPrimary().Namespace()).Items)
log.Println("operatorPod", operatorPod.Name)
log.Println("operatorPod:", operatorPod.Name)
Eventually(func(g Gomega) string {
stdout, stderr, err := factory.ExecuteCmdOnPod(context.Background(), &operatorPod, "manager", fmt.Sprintf("kubectl-fdb -n %s --version-check=false version", fdbCluster.GetPrimary().Namespace()), false)
g.Expect(err).NotTo(HaveOccurred(), stderr)
Expand Down Expand Up @@ -147,11 +147,9 @@ var _ = Describe("Operator Plugin", Label("e2e", "pr"), func() {
// Pick one operator pod and execute the recovery command
operatorPod := factory.RandomPickOnePod(factory.GetOperatorPods(remote.Namespace()).Items)
log.Println("operatorPod:", operatorPod.Name)
Eventually(func() error {
stdout, stderr, err := factory.ExecuteCmdOnPod(context.Background(), &operatorPod, "manager", fmt.Sprintf("kubectl-fdb -n %s recover-multi-region-cluster --version-check=false --wait=false %s", remote.Namespace(), remote.Name()), false)
log.Println("stdout:", stdout, "stderr:", stderr)
return err
}).WithTimeout(30 * time.Minute).WithPolling(5 * time.Minute).ShouldNot(HaveOccurred())
stdout, stderr, err := factory.ExecuteCmdOnPod(context.Background(), &operatorPod, "manager", fmt.Sprintf("kubectl-fdb -n %s recover-multi-region-cluster --version-check=false --wait=false %s", remote.Namespace(), remote.Name()), false)
log.Println("stdout:", stdout, "stderr:", stderr)
Expect(err).NotTo(HaveOccurred())

// Ensure the cluster is available again.
Eventually(func() bool {
Expand Down
18 changes: 1 addition & 17 deletions internal/pod_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func NewFdbPodClient(cluster *fdbv1beta2.FoundationDBCluster, pod *corev1.Pod, l
}
}

useTLS := podHasSidecarTLS(pod)
useTLS := PodHasSidecarTLS(pod)

var tlsConfig = &tls.Config{}
if useTLS {
Expand Down Expand Up @@ -375,22 +375,6 @@ func (client *realFdbPodAnnotationClient) IsPresent(_ string) (bool, error) {
return true, nil
}

// podHasSidecarTLS determines whether a pod currently has TLS enabled for the
// sidecar process.
func podHasSidecarTLS(pod *corev1.Pod) bool {
for _, container := range pod.Spec.Containers {
if container.Name == fdbv1beta2.SidecarContainerName {
for _, arg := range container.Args {
if arg == "--tls" {
return true
}
}
}
}

return false
}

// GetImageType determines whether a pod is using the unified or the split image.
func GetImageType(pod *corev1.Pod) fdbv1beta2.ImageType {
return GetImageTypeFromAnnotation(pod.Annotations)
Expand Down
4 changes: 2 additions & 2 deletions internal/pod_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ var _ = Describe("pod_client", func() {
It("should not have TLS sidecar TLS", func() {
pod, err := GetPod(cluster, GetProcessGroup(cluster, fdbv1beta2.ProcessClassStorage, 1))
Expect(err).NotTo(HaveOccurred())
Expect(podHasSidecarTLS(pod)).To(BeFalse())
Expect(PodHasSidecarTLS(pod)).To(BeFalse())
})
})

Expand All @@ -60,7 +60,7 @@ var _ = Describe("pod_client", func() {
It("should have TLS sidecar TLS", func() {
pod, err := GetPod(cluster, GetProcessGroup(cluster, fdbv1beta2.ProcessClassStorage, 1))
Expect(err).NotTo(HaveOccurred())
Expect(podHasSidecarTLS(pod)).To(BeTrue())
Expect(PodHasSidecarTLS(pod)).To(BeTrue())
})
})

Expand Down
16 changes: 16 additions & 0 deletions internal/pod_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,3 +236,19 @@ func GetPublicIPSource(pod *corev1.Pod) (fdbv1beta2.PublicIPSource, error) {
}
return fdbv1beta2.PublicIPSource(source), nil
}

// PodHasSidecarTLS determines whether a pod currently has TLS enabled for the sidecar process.
// This method should only be used for split images.
func PodHasSidecarTLS(pod *corev1.Pod) bool {
for _, container := range pod.Spec.Containers {
if container.Name == fdbv1beta2.SidecarContainerName {
for _, arg := range container.Args {
if arg == "--tls" {
return true
}
}
}
}

return false
}
10 changes: 4 additions & 6 deletions kubectl-fdb/cmd/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
package cmd

import (
"context"
"log"
"os"

fdbv1beta2 "github.com/FoundationDB/fdb-kubernetes-operator/api/v1beta2"
kubeHelper "github.com/FoundationDB/fdb-kubernetes-operator/internal/kubernetes"
Expand Down Expand Up @@ -66,7 +64,7 @@ func newExecCmd(streams genericclioptions.IOStreams) *cobra.Command {
return err
}

return runExec(cmd.Context(), kubeClient, cluster, config, args)
return runExec(cmd, kubeClient, cluster, config, args)
},
Example: `
# Open a shell.
Expand All @@ -91,8 +89,8 @@ func newExecCmd(streams genericclioptions.IOStreams) *cobra.Command {
return cmd
}

func runExec(ctx context.Context, kubeClient client.Client, cluster *fdbv1beta2.FoundationDBCluster, config *rest.Config, commandArgs []string) error {
pods, err := getRunningPodsForCluster(ctx, kubeClient, cluster)
func runExec(cmd *cobra.Command, kubeClient client.Client, cluster *fdbv1beta2.FoundationDBCluster, config *rest.Config, commandArgs []string) error {
pods, err := getRunningPodsForCluster(cmd.Context(), kubeClient, cluster)
if err != nil {
return err
}
Expand All @@ -102,5 +100,5 @@ func runExec(ctx context.Context, kubeClient client.Client, cluster *fdbv1beta2.
return err
}

return kubeHelper.ExecuteCommandRaw(ctx, kubeClient, config, clientPod.Namespace, clientPod.Name, fdbv1beta2.MainContainerName, commandArgs, os.Stdin, os.Stdout, os.Stderr, true)
return kubeHelper.ExecuteCommandRaw(cmd.Context(), kubeClient, config, clientPod.Namespace, clientPod.Name, fdbv1beta2.MainContainerName, commandArgs, cmd.InOrStdin(), cmd.OutOrStdout(), cmd.OutOrStderr(), true)
}
9 changes: 8 additions & 1 deletion kubectl-fdb/cmd/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
package cmd

import (
"bytes"
"context"
"k8s.io/cli-runtime/pkg/genericclioptions"

fdbv1beta2 "github.com/FoundationDB/fdb-kubernetes-operator/api/v1beta2"
. "github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -56,7 +58,12 @@ var _ = Describe("[plugin] exec command", func() {

DescribeTable("should execute the provided command",
func(input testCase) {
Expect(runExec(context.Background(), k8sClient, cluster, &rest.Config{}, input.Command)).NotTo(HaveOccurred())
outBuffer := bytes.Buffer{}
errBuffer := bytes.Buffer{}
inBuffer := bytes.Buffer{}

rootCmd := NewRootCmd(genericclioptions.IOStreams{In: &inBuffer, Out: &outBuffer, ErrOut: &errBuffer}, &MockVersionChecker{})
Expect(runExec(rootCmd, k8sClient, cluster, &rest.Config{}, input.Command)).NotTo(HaveOccurred())
},
Entry("Exec into instance with valid pod",
testCase{
Expand Down
151 changes: 86 additions & 65 deletions kubectl-fdb/cmd/recover_multi_region_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,7 @@ func recoverMultiRegionCluster(cmd *cobra.Command, opts recoverMultiRegionCluste
lastConnectionString := cluster.Status.ConnectionString
lastConnectionStringParts := strings.Split(lastConnectionString, "@")
addresses := strings.Split(lastConnectionStringParts[1], ",")
// Since this is a multi-region cluster, we expect 9 coordinators.
if len(addresses) != 9 {
return fmt.Errorf("expected exactly 9 addresses, got %d", len(addresses))
}

cmd.Println("current connection string", lastConnectionString)

var useTLS bool
Expand Down Expand Up @@ -206,9 +203,28 @@ func recoverMultiRegionCluster(cmd *cobra.Command, opts recoverMultiRegionCluste
return fmt.Errorf("could not find any running coordinator for this cluster")
}

// Drop the multi-region setup if present.
newDatabaseConfiguration := cluster.Spec.DatabaseConfiguration.DeepCopy()
// Drop the multi-region configuration.
newDatabaseConfiguration.UsableRegions = 1
newDatabaseConfiguration.Regions = []fdbv1beta2.Region{
{
DataCenters: []fdbv1beta2.DataCenter{
{
ID: cluster.Spec.DataCenter,
},
},
},
}
cmd.Println("Update the database configuration to single region configuration")
err = updateDatabaseConfiguration(cmd.Context(), opts.client, cluster, *newDatabaseConfiguration)
if err != nil {
return err
}

// Pick 5 new coordinators.
needsUpload := make([]*corev1.Pod, 0, 5)
for len(newCoordinators) < 5 {
needsUpload := make([]*corev1.Pod, 0, cluster.DesiredCoordinatorCount())
for len(newCoordinators) < cluster.DesiredCoordinatorCount() {
cmd.Println("Current coordinators:", len(newCoordinators))
candidate := candidates[len(newCoordinators)]
addr, parseErr := fdbv1beta2.ParseProcessAddress(candidate.Status.PodIP)
Expand All @@ -226,44 +242,47 @@ func recoverMultiRegionCluster(cmd *cobra.Command, opts recoverMultiRegionCluste
needsUpload = append(needsUpload, candidate)
}

// Copy the coordinator state from one of the running coordinators to your local machine:
coordinatorFiles := []string{"coordination-0.fdq", "coordination-1.fdq"}
tmpCoordinatorFiles := make([]string, 2)
tmpDir := os.TempDir()
for idx, coordinatorFile := range coordinatorFiles {
tmpCoordinatorFiles[idx] = path.Join(tmpDir, coordinatorFile)
}

cmd.Println("tmpCoordinatorFiles", tmpCoordinatorFiles, "checking the location of the coordination-0.fdq in Pod", runningCoordinator.Name)
stdout, stderr, err := kubeHelper.ExecuteCommandOnPod(context.Background(), opts.client, opts.config, runningCoordinator, fdbv1beta2.MainContainerName, "find /var/fdb/data/ -type f -name 'coordination-0.fdq' -print -quit | head -n 1", false)
if err != nil {
cmd.Println(stderr)
return err
}

lines := strings.Split(stdout, "\n")
if len(lines) == 0 {
return fmt.Errorf("no coordination file found in %s", runningCoordinator.Name)
}
// If at least one coordinator needs to get the files uploaded, we perform the download and upload for the coordinators.
if len(needsUpload) > 0 {
// Copy the coordinator state from one of the running coordinators to your local machine:
coordinatorFiles := []string{"coordination-0.fdq", "coordination-1.fdq"}
tmpCoordinatorFiles := make([]string, 2)
tmpDir := os.TempDir()
for idx, coordinatorFile := range coordinatorFiles {
tmpCoordinatorFiles[idx] = path.Join(tmpDir, coordinatorFile)
}

dataDir := path.Dir(strings.TrimSpace(lines[0]))
cmd.Println("dataDir:", dataDir)
for idx, coordinatorFile := range coordinatorFiles {
err = downloadCoordinatorFile(cmd, opts.client, opts.config, runningCoordinator, path.Join(dataDir, coordinatorFile), tmpCoordinatorFiles[idx])
cmd.Println("tmpCoordinatorFiles", tmpCoordinatorFiles, "checking the location of the coordination-0.fdq in Pod", runningCoordinator.Name)
stdout, stderr, err := kubeHelper.ExecuteCommandOnPod(context.Background(), opts.client, opts.config, runningCoordinator, fdbv1beta2.MainContainerName, "find /var/fdb/data/ -type f -name 'coordination-0.fdq' -print -quit | head -n 1", false)
if err != nil {
cmd.Println(stderr)
return err
}
}

for _, target := range needsUpload {
targetDataDir := getDataDir(dataDir, target, cluster)
lines := strings.Split(stdout, "\n")
if len(lines) == 0 {
return fmt.Errorf("no coordination file found in %s", runningCoordinator.Name)
}

dataDir := path.Dir(strings.TrimSpace(lines[0]))
cmd.Println("dataDir:", dataDir)
for idx, coordinatorFile := range coordinatorFiles {
err = uploadCoordinatorFile(cmd, opts.client, opts.config, target, tmpCoordinatorFiles[idx], path.Join(targetDataDir, coordinatorFile))
err = downloadCoordinatorFile(cmd, opts.client, opts.config, runningCoordinator, path.Join(dataDir, coordinatorFile), tmpCoordinatorFiles[idx])
if err != nil {
return err
}
}

for _, target := range needsUpload {
targetDataDir := getDataDir(dataDir, target, cluster)

for idx, coordinatorFile := range coordinatorFiles {
err = uploadCoordinatorFile(cmd, opts.client, opts.config, target, tmpCoordinatorFiles[idx], path.Join(targetDataDir, coordinatorFile))
if err != nil {
return err
}
}
}
}

// Update the `ConfigMap` to contain the new connection string, the new connection string must contain the still existing coordinators and the new coordinators. The old entries must be removed.
Expand All @@ -289,6 +308,37 @@ func recoverMultiRegionCluster(cmd *cobra.Command, opts recoverMultiRegionCluste
// Wait ~1 min until the `ConfigMap` is synced to all Pods, you can check the `/var/dynamic-conf/fdb.cluster` inside a Pod if you are unsure.
time.Sleep(2 * time.Minute)

// If the split image is used we have to update the copied files by making a POST request against the sidecar API.
// In the unified image, this step is not required as the dynamic files are directly mounted in the main container.
// We are not deleting the Pods as the operator is set to skip the reconciliation and therefore the deleted Pods
// would not be recreated.
if !cluster.UseUnifiedImage() {
cmd.Println("The cluster uses the split image, the plugin will update the copied files")
for _, pod := range pods.Items {
loopPod := pod

command := []string{"/bin/bash", "-c"}

var curlStr strings.Builder
curlStr.WriteString("curl -X POST")
if internal.PodHasSidecarTLS(&loopPod) {
curlStr.WriteString(" --cacert ${FDB_TLS_CA_FILE} --cert ${FDB_TLS_CERTIFICATE_FILE} --key ${FDB_TLS_KEY_FILE} -k https://")
} else {
curlStr.WriteString(" http://")
}

curlStr.WriteString(loopPod.Status.PodIP)
curlStr.WriteString(":8080/copy_files > /dev/null")

command = append(command, curlStr.String())

err = kubeHelper.ExecuteCommandRaw(cmd.Context(), opts.client, opts.config, runningCoordinator.Namespace, runningCoordinator.Name, fdbv1beta2.MainContainerName, command, nil, cmd.OutOrStdout(), cmd.OutOrStderr(), false)
if err != nil {
return err
}
}
}

cmd.Println("Killing fdbserver processes")
// Now all Pods must be restarted and the previous local cluster file must be deleted to make sure the fdbserver is picking the connection string from the seed cluster file (`/var/dynamic-conf/fdb.cluster`).
err = restartFdbserverInCluster(cmd.Context(), opts.client, opts.config, cluster)
Expand All @@ -302,41 +352,12 @@ func recoverMultiRegionCluster(cmd *cobra.Command, opts recoverMultiRegionCluste
command := []string{"fdbcli", "--exec", fmt.Sprintf("force_recovery_with_data_loss %s", cluster.Spec.DataCenter)}
// Now you can exec into a container and use `fdbcli` to connect to the cluster.
// If you use a multi-region cluster you have to issue `force_recovery_with_data_loss`
var attempts int
var failOverErr error
for attempts < 5 {
cmd.Println("Triggering force recovery with command:", command, "attempt:", attempts)
failOverErr = kubeHelper.ExecuteCommandRaw(cmd.Context(), opts.client, opts.config, runningCoordinator.Namespace, runningCoordinator.Name, fdbv1beta2.MainContainerName, command, nil, cmd.OutOrStdout(), cmd.OutOrStderr(), false)
if failOverErr != nil {
cmd.Println("failed:", failOverErr.Error(), "waiting 15 seconds")
time.Sleep(15 * time.Second)
attempts++
continue
}

break
}

if failOverErr != nil {
return failOverErr
}

newDatabaseConfiguration := cluster.Spec.DatabaseConfiguration.FailOver()
// Drop the multi-region configuration.
newDatabaseConfiguration.Regions = []fdbv1beta2.Region{
{
DataCenters: []fdbv1beta2.DataCenter{
{
ID: cluster.Spec.DataCenter,
},
},
},
}

err = updateDatabaseConfiguration(cmd.Context(), opts.client, cluster, newDatabaseConfiguration)
cmd.Println("Triggering force recovery with command:", command)
err = kubeHelper.ExecuteCommandRaw(cmd.Context(), opts.client, opts.config, runningCoordinator.Namespace, runningCoordinator.Name, fdbv1beta2.MainContainerName, command, nil, cmd.OutOrStdout(), cmd.OutOrStderr(), false)
if err != nil {
return err
}

// Now you can set `spec.Skip = false` to let the operator take over again.
// Skip the cluster, make sure the operator is not taking any action on the cluster.
err = setSkipReconciliation(cmd.Context(), opts.client, cluster, false)
Expand Down

0 comments on commit 0ec30b0

Please sign in to comment.