Skip to content

Commit

Permalink
fix: data injection to return errors
Browse files Browse the repository at this point in the history
  • Loading branch information
phillebaba committed Jul 16, 2024
1 parent eca0c53 commit 5a5b746
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 100 deletions.
182 changes: 92 additions & 90 deletions src/pkg/cluster/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
Expand All @@ -32,12 +31,10 @@ import (

// HandleDataInjection waits for the target pod(s) to come up and inject the data into them
// todo: this currently requires kubectl but we should have enough k8s work to make this native now.
func (c *Cluster) HandleDataInjection(ctx context.Context, wg *sync.WaitGroup, data types.ZarfDataInjection, componentPath *layout.ComponentPaths, dataIdx int) {
defer wg.Done()
func (c *Cluster) HandleDataInjection(ctx context.Context, data types.ZarfDataInjection, componentPath *layout.ComponentPaths, dataIdx int) error {

Check warning on line 34 in src/pkg/cluster/data.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/cluster/data.go#L34

Added line #L34 was not covered by tests
injectionCompletionMarker := filepath.Join(componentPath.DataInjections, config.GetDataInjectionMarker())
if err := os.WriteFile(injectionCompletionMarker, []byte("🦄"), helpers.ReadWriteUser); err != nil {
message.WarnErrf(err, "Unable to create the data injection completion marker")
return
return fmt.Errorf("unable to create the data injection completion marker: %w", err)

Check warning on line 37 in src/pkg/cluster/data.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/cluster/data.go#L37

Added line #L37 was not covered by tests
}

tarCompressFlag := ""
Expand All @@ -59,103 +56,110 @@ func (c *Cluster) HandleDataInjection(ctx context.Context, wg *sync.WaitGroup, d
shell, shellArgs := exec.GetOSShell(exec.Shell{Windows: "cmd"})

if _, _, err := exec.Cmd(shell, append(shellArgs, "tar --version")...); err != nil {
message.WarnErr(err, "Unable to execute tar on this system. Please ensure it is installed and on your $PATH.")
return
return fmt.Errorf("unable to execute tar, ensure it is installed in the $PATH: %w", err)

Check warning on line 59 in src/pkg/cluster/data.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/cluster/data.go#L59

Added line #L59 was not covered by tests
}

iterator:
// The eternal loop because some data injections can take a very long time
for {
message.Debugf("Attempting to inject data into %s", data.Target)
source := filepath.Join(componentPath.DataInjections, filepath.Base(data.Target.Path))
if helpers.InvalidPath(source) {
// The path is likely invalid because of how we compose OCI components, add an index suffix to the filename
source = filepath.Join(componentPath.DataInjections, strconv.Itoa(dataIdx), filepath.Base(data.Target.Path))
select {
case <-ctx.Done():
return ctx.Err()
default:
message.Debugf("Attempting to inject data into %s", data.Target)
source := filepath.Join(componentPath.DataInjections, filepath.Base(data.Target.Path))

Check warning on line 68 in src/pkg/cluster/data.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/cluster/data.go#L63-L68

Added lines #L63 - L68 were not covered by tests
if helpers.InvalidPath(source) {
message.Warnf("Unable to find the data injection source path %s", source)
return
// The path is likely invalid because of how we compose OCI components, add an index suffix to the filename
source = filepath.Join(componentPath.DataInjections, strconv.Itoa(dataIdx), filepath.Base(data.Target.Path))
if helpers.InvalidPath(source) {
return fmt.Errorf("could not find the data injection source path %s", source)

Check warning on line 73 in src/pkg/cluster/data.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/cluster/data.go#L71-L73

Added lines #L71 - L73 were not covered by tests
}
}
}

target := podLookup{
Namespace: data.Target.Namespace,
Selector: data.Target.Selector,
Container: data.Target.Container,
}

// Wait until the pod we are injecting data into becomes available
pods := waitForPodsAndContainers(ctx, c.Clientset, target, podFilterByInitContainer)
if len(pods) < 1 {
continue
}
target := podLookup{
Namespace: data.Target.Namespace,
Selector: data.Target.Selector,
Container: data.Target.Container,

Check warning on line 80 in src/pkg/cluster/data.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/cluster/data.go#L77-L80

Added lines #L77 - L80 were not covered by tests
}

// Inject into all the pods
for _, pod := range pods {
// Try to use the embedded kubectl if we can
zarfCommand, err := utils.GetFinalExecutableCommand()
kubectlBinPath := "kubectl"
// Wait until the pod we are injecting data into becomes available
pods, err := waitForPodsAndContainers(ctx, c.Clientset, target, podFilterByInitContainer)

Check warning on line 84 in src/pkg/cluster/data.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/cluster/data.go#L84

Added line #L84 was not covered by tests
if err != nil {
message.Warnf("Unable to get the zarf executable path, falling back to host kubectl: %s", err)
} else {
kubectlBinPath = fmt.Sprintf("%s tools kubectl", zarfCommand)
return err

Check warning on line 86 in src/pkg/cluster/data.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/cluster/data.go#L86

Added line #L86 was not covered by tests
}
if len(pods) < 1 {
continue

Check warning on line 89 in src/pkg/cluster/data.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/cluster/data.go#L88-L89

Added lines #L88 - L89 were not covered by tests
}
kubectlCmd := fmt.Sprintf("%s exec -i -n %s %s -c %s ", kubectlBinPath, data.Target.Namespace, pod.Name, data.Target.Container)

// Note that each command flag is separated to provide the widest cross-platform tar support
tarCmd := fmt.Sprintf("tar -c %s -f -", tarCompressFlag)
untarCmd := fmt.Sprintf("tar -x %s -v -f - -C %s", tarCompressFlag, data.Target.Path)
// Inject into all the pods
for _, pod := range pods {

Check warning on line 93 in src/pkg/cluster/data.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/cluster/data.go#L93

Added line #L93 was not covered by tests
// Try to use the embedded kubectl if we can
zarfCommand, err := utils.GetFinalExecutableCommand()
kubectlBinPath := "kubectl"
if err != nil {
message.Warnf("Unable to get the zarf executable path, falling back to host kubectl: %s", err)
} else {
kubectlBinPath = fmt.Sprintf("%s tools kubectl", zarfCommand)

Check warning on line 100 in src/pkg/cluster/data.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/cluster/data.go#L95-L100

Added lines #L95 - L100 were not covered by tests
}
kubectlCmd := fmt.Sprintf("%s exec -i -n %s %s -c %s ", kubectlBinPath, data.Target.Namespace, pod.Name, data.Target.Container)

Check warning on line 102 in src/pkg/cluster/data.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/cluster/data.go#L102

Added line #L102 was not covered by tests

// Must create the target directory before trying to change to it for untar
mkdirCmd := fmt.Sprintf("%s -- mkdir -p %s", kubectlCmd, data.Target.Path)
if err := exec.CmdWithPrint(shell, append(shellArgs, mkdirCmd)...); err != nil {
message.Warnf("Unable to create the data injection target directory %s in pod %s", data.Target.Path, pod.Name)
continue iterator
}
// Note that each command flag is separated to provide the widest cross-platform tar support
tarCmd := fmt.Sprintf("tar -c %s -f -", tarCompressFlag)
untarCmd := fmt.Sprintf("tar -x %s -v -f - -C %s", tarCompressFlag, data.Target.Path)

Check warning on line 106 in src/pkg/cluster/data.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/cluster/data.go#L105-L106

Added lines #L105 - L106 were not covered by tests

cpPodCmd := fmt.Sprintf("%s -C %s . | %s -- %s",
tarCmd,
source,
kubectlCmd,
untarCmd,
)

// Do the actual data injection
if err := exec.CmdWithPrint(shell, append(shellArgs, cpPodCmd)...); err != nil {
message.Warnf("Error copying data into the pod %#v: %#v\n", pod.Name, err)
continue iterator
}
// Must create the target directory before trying to change to it for untar
mkdirCmd := fmt.Sprintf("%s -- mkdir -p %s", kubectlCmd, data.Target.Path)
if err := exec.CmdWithPrint(shell, append(shellArgs, mkdirCmd)...); err != nil {
return fmt.Errorf("unable to create the data injection target directory %s in pod %s: %w", data.Target.Path, pod.Name, err)

Check warning on line 111 in src/pkg/cluster/data.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/cluster/data.go#L109-L111

Added lines #L109 - L111 were not covered by tests
}

cpPodCmd := fmt.Sprintf("%s -C %s . | %s -- %s",
tarCmd,
source,
kubectlCmd,
untarCmd,
)

Check warning on line 119 in src/pkg/cluster/data.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/cluster/data.go#L114-L119

Added lines #L114 - L119 were not covered by tests

// Leave a marker in the target container for pods to track the sync action
cpPodCmd = fmt.Sprintf("%s -C %s %s | %s -- %s",
tarCmd,
componentPath.DataInjections,
config.GetDataInjectionMarker(),
kubectlCmd,
untarCmd,
)

if err := exec.CmdWithPrint(shell, append(shellArgs, cpPodCmd)...); err != nil {
message.Warnf("Error saving the zarf sync completion file after injection into pod %#v\n", pod.Name)
continue iterator
// Do the actual data injection
if err := exec.CmdWithPrint(shell, append(shellArgs, cpPodCmd)...); err != nil {
return fmt.Errorf("could not copy data into the pod %s: %w", pod.Name, err)

Check warning on line 123 in src/pkg/cluster/data.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/cluster/data.go#L122-L123

Added lines #L122 - L123 were not covered by tests
}

// Leave a marker in the target container for pods to track the sync action
cpPodCmd = fmt.Sprintf("%s -C %s %s | %s -- %s",
tarCmd,
componentPath.DataInjections,
config.GetDataInjectionMarker(),
kubectlCmd,
untarCmd,
)

Check warning on line 133 in src/pkg/cluster/data.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/cluster/data.go#L127-L133

Added lines #L127 - L133 were not covered by tests

if err := exec.CmdWithPrint(shell, append(shellArgs, cpPodCmd)...); err != nil {
return fmt.Errorf("could not save the Zarf sync completion file after injection into pod %s: %w", pod.Name, err)

Check warning on line 136 in src/pkg/cluster/data.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/cluster/data.go#L135-L136

Added lines #L135 - L136 were not covered by tests
}
}
}

// Do not look for a specific container after injection in case they are running an init container
podOnlyTarget := podLookup{
Namespace: data.Target.Namespace,
Selector: data.Target.Selector,
}
// Do not look for a specific container after injection in case they are running an init container
podOnlyTarget := podLookup{
Namespace: data.Target.Namespace,
Selector: data.Target.Selector,

Check warning on line 143 in src/pkg/cluster/data.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/cluster/data.go#L141-L143

Added lines #L141 - L143 were not covered by tests
}

// Block one final time to make sure at least one pod has come up and injected the data
// Using only the pod as the final selector because we don't know what the container name will be
// Still using the init container filter to make sure we have the right running pod
_ = waitForPodsAndContainers(ctx, c.Clientset, podOnlyTarget, podFilterByInitContainer)
// Block one final time to make sure at least one pod has come up and injected the data
// Using only the pod as the final selector because we don't know what the container name will be
// Still using the init container filter to make sure we have the right running pod
_, err = waitForPodsAndContainers(ctx, c.Clientset, podOnlyTarget, podFilterByInitContainer)
if err != nil {
return err

Check warning on line 151 in src/pkg/cluster/data.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/cluster/data.go#L149-L151

Added lines #L149 - L151 were not covered by tests
}

// Cleanup now to reduce disk pressure
_ = os.RemoveAll(source)
// Cleanup now to reduce disk pressure
err = os.RemoveAll(source)
if err != nil {
return err

Check warning on line 157 in src/pkg/cluster/data.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/cluster/data.go#L155-L157

Added lines #L155 - L157 were not covered by tests
}

// Return to stop the loop
return
// Return to stop the loop
return nil

Check warning on line 161 in src/pkg/cluster/data.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/cluster/data.go#L161

Added line #L161 was not covered by tests
}
}
}

Expand All @@ -173,7 +177,7 @@ type podFilter func(pod corev1.Pod) bool
// It will wait up to 90 seconds for the pods to be found and will return a list of matching pod names
// If the timeout is reached, an empty list will be returned.
// TODO: Test, refactor and/or remove.
func waitForPodsAndContainers(ctx context.Context, clientset kubernetes.Interface, target podLookup, include podFilter) []corev1.Pod {
func waitForPodsAndContainers(ctx context.Context, clientset kubernetes.Interface, target podLookup, include podFilter) ([]corev1.Pod, error) {

Check warning on line 180 in src/pkg/cluster/data.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/cluster/data.go#L180

Added line #L180 was not covered by tests
waitCtx, cancel := context.WithTimeout(ctx, 90*time.Second)
defer cancel()

Expand All @@ -183,16 +187,14 @@ func waitForPodsAndContainers(ctx context.Context, clientset kubernetes.Interfac
for {
select {
case <-waitCtx.Done():
message.Debug("Pod lookup failed: %v", ctx.Err())
return nil
return nil, ctx.Err()

Check warning on line 190 in src/pkg/cluster/data.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/cluster/data.go#L190

Added line #L190 was not covered by tests
case <-timer.C:
listOpts := metav1.ListOptions{
LabelSelector: target.Selector,
}
podList, err := clientset.CoreV1().Pods(target.Namespace).List(ctx, listOpts)
if err != nil {
message.Debug("Unable to find matching pods: %w", err)
return nil
return nil, err

Check warning on line 197 in src/pkg/cluster/data.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/cluster/data.go#L197

Added line #L197 was not covered by tests
}

message.Debug("Found %d pods for target %#v", len(podList.Items), target)
Expand Down Expand Up @@ -245,7 +247,7 @@ func waitForPodsAndContainers(ctx context.Context, clientset kubernetes.Interfac
}
}
if len(readyPods) > 0 {
return readyPods
return readyPods, nil

Check warning on line 250 in src/pkg/cluster/data.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/cluster/data.go#L250

Added line #L250 was not covered by tests
}
timer.Reset(3 * time.Second)
}
Expand Down
21 changes: 11 additions & 10 deletions src/pkg/packager/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"time"

"golang.org/x/sync/errgroup"

corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -288,7 +289,6 @@ func (p *Packager) deployComponent(ctx context.Context, component types.ZarfComp
hasCharts := len(component.Charts) > 0
hasManifests := len(component.Manifests) > 0
hasRepos := len(component.Repos) > 0
hasDataInjections := len(component.DataInjections) > 0
hasFiles := len(component.Files) > 0

onDeploy := component.Actions.OnDeploy
Expand Down Expand Up @@ -339,14 +339,11 @@ func (p *Packager) deployComponent(ctx context.Context, component types.ZarfComp
}
}

if hasDataInjections {
waitGroup := sync.WaitGroup{}
defer waitGroup.Wait()

for idx, data := range component.DataInjections {
waitGroup.Add(1)
go p.cluster.HandleDataInjection(ctx, &waitGroup, data, componentPath, idx)
}
g, gCtx := errgroup.WithContext(ctx)
for idx, data := range component.DataInjections {
g.Go(func() error {
return p.cluster.HandleDataInjection(gCtx, data, componentPath, idx)
})

Check warning on line 346 in src/pkg/packager/deploy.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/packager/deploy.go#L342-L346

Added lines #L342 - L346 were not covered by tests
}

if hasCharts || hasManifests {
Expand All @@ -359,6 +356,10 @@ func (p *Packager) deployComponent(ctx context.Context, component types.ZarfComp
return charts, fmt.Errorf("unable to run component after action: %w", err)
}

err = g.Wait()
if err != nil {
return nil, err

Check warning on line 361 in src/pkg/packager/deploy.go

View check run for this annotation

Codecov / codecov/patch

src/pkg/packager/deploy.go#L359-L361

Added lines #L359 - L361 were not covered by tests
}
return charts, nil
}

Expand Down

0 comments on commit 5a5b746

Please sign in to comment.