From 28552258ae28a6ae46ba6dd7311652922fbc39d7 Mon Sep 17 00:00:00 2001 From: allenxu404 Date: Wed, 3 Apr 2024 11:28:04 +0800 Subject: [PATCH] Wait for results of restore exec hook executions in Finalizing phase instead of InProgress phase Signed-off-by: allenxu404 --- changelogs/unreleased/7619-allenxu404 | 1 + internal/hook/hook_tracker.go | 167 ++++++++++++++---- internal/hook/hook_tracker_test.go | 137 ++++++++++++-- internal/hook/item_hook_handler.go | 11 +- internal/hook/item_hook_handler_test.go | 23 +-- internal/hook/wait_exec_hook_handler.go | 12 +- internal/hook/wait_exec_hook_handler_test.go | 31 ++-- pkg/backup/backup.go | 2 +- pkg/cmd/server/server.go | 5 + .../restore_finalizer_controller.go | 71 ++++++-- .../restore_finalizer_controller_test.go | 100 +++++++++++ pkg/restore/restore.go | 150 ++++++++-------- 12 files changed, 538 insertions(+), 172 deletions(-) create mode 100644 changelogs/unreleased/7619-allenxu404 diff --git a/changelogs/unreleased/7619-allenxu404 b/changelogs/unreleased/7619-allenxu404 new file mode 100644 index 0000000000..fc7b1fba5e --- /dev/null +++ b/changelogs/unreleased/7619-allenxu404 @@ -0,0 +1 @@ +Wait for results of restore exec hook executions in Finalizing phase instead of InProgress phase \ No newline at end of file diff --git a/internal/hook/hook_tracker.go b/internal/hook/hook_tracker.go index a0300d8f63..feffabf350 100644 --- a/internal/hook/hook_tracker.go +++ b/internal/hook/hook_tracker.go @@ -26,8 +26,8 @@ const ( HookSourceSpec = "spec" ) -// hookTrackerKey identifies a backup/restore hook -type hookTrackerKey struct { +// hookKey identifies a backup/restore hook +type hookKey struct { // PodNamespace indicates the namespace of pod where hooks are executed. // For hooks specified in the backup/restore spec, this field is the namespace of an applicable pod. // For hooks specified in pod annotation, this field is the namespace of pod where hooks are annotated. @@ -48,37 +48,46 @@ type hookTrackerKey struct { container string } -// hookTrackerVal records the execution status of a specific hook. -// hookTrackerVal is extensible to accommodate additional fields as needs develop. -type hookTrackerVal struct { +// hookStatus records the execution status of a specific hook. +// hookStatus is extensible to accommodate additional fields as needs develop. +type hookStatus struct { // HookFailed indicates if hook failed to execute. hookFailed bool // hookExecuted indicates if hook already execute. hookExecuted bool } -// HookTracker tracks all hooks' execution status +// HookTracker tracks all hooks' execution status in a single backup/restore. type HookTracker struct { - lock *sync.RWMutex - tracker map[hookTrackerKey]hookTrackerVal + lock *sync.RWMutex + // tracker records all hook info for a single backup/restore. + tracker map[hookKey]hookStatus + // hookAttemptedCnt indicates the number of attempted hooks. + hookAttemptedCnt int + // hookFailedCnt indicates the number of failed hooks. + hookFailedCnt int + // HookExecutedCnt indicates the number of executed hooks. + hookExecutedCnt int + // hookErrs records hook execution errors if any. + hookErrs []HookErrInfo } -// NewHookTracker creates a hookTracker. +// NewHookTracker creates a hookTracker instance. func NewHookTracker() *HookTracker { return &HookTracker{ lock: &sync.RWMutex{}, - tracker: make(map[hookTrackerKey]hookTrackerVal), + tracker: make(map[hookKey]hookStatus), } } -// Add adds a hook to the tracker +// Add adds a hook to the hook tracker // Add must precede the Record for each individual hook. // In other words, a hook must be added to the tracker before its execution result is recorded. func (ht *HookTracker) Add(podNamespace, podName, container, source, hookName string, hookPhase hookPhase) { ht.lock.Lock() defer ht.lock.Unlock() - key := hookTrackerKey{ + key := hookKey{ podNamespace: podNamespace, podName: podName, hookSource: source, @@ -88,21 +97,22 @@ func (ht *HookTracker) Add(podNamespace, podName, container, source, hookName st } if _, ok := ht.tracker[key]; !ok { - ht.tracker[key] = hookTrackerVal{ + ht.tracker[key] = hookStatus{ hookFailed: false, hookExecuted: false, } + ht.hookAttemptedCnt++ } } // Record records the hook's execution status // Add must precede the Record for each individual hook. // In other words, a hook must be added to the tracker before its execution result is recorded. -func (ht *HookTracker) Record(podNamespace, podName, container, source, hookName string, hookPhase hookPhase, hookFailed bool) error { +func (ht *HookTracker) Record(podNamespace, podName, container, source, hookName string, hookPhase hookPhase, hookFailed bool, hookErr error) error { ht.lock.Lock() defer ht.lock.Unlock() - key := hookTrackerKey{ + key := hookKey{ podNamespace: podNamespace, podName: podName, hookSource: source, @@ -111,38 +121,125 @@ func (ht *HookTracker) Record(podNamespace, podName, container, source, hookName hookName: hookName, } - var err error - if _, ok := ht.tracker[key]; ok { - ht.tracker[key] = hookTrackerVal{ + if _, ok := ht.tracker[key]; !ok { + return fmt.Errorf("hook not exist in hook tracker, hook: %+v", key) + } + + if !ht.tracker[key].hookExecuted { + ht.tracker[key] = hookStatus{ hookFailed: hookFailed, hookExecuted: true, } - } else { - err = fmt.Errorf("hook not exist in hooks tracker, hook key: %v", key) + ht.hookExecutedCnt++ + if hookFailed { + ht.hookFailedCnt++ + ht.hookErrs = append(ht.hookErrs, HookErrInfo{Namespace: key.podNamespace, Err: hookErr}) + } } - return err + return nil } -// Stat calculates the number of attempted hooks and failed hooks -func (ht *HookTracker) Stat() (hookAttemptedCnt int, hookFailed int) { +// Stat returns the number of attempted hooks and failed hooks +func (ht *HookTracker) Stat() (hookAttemptedCnt int, hookFailedCnt int) { ht.lock.RLock() defer ht.lock.RUnlock() - for _, hookInfo := range ht.tracker { - if hookInfo.hookExecuted { - hookAttemptedCnt++ - if hookInfo.hookFailed { - hookFailed++ - } - } - } - return + return ht.hookAttemptedCnt, ht.hookFailedCnt +} + +// IsComplete returns whether the execution of all hooks has finished or not +func (ht *HookTracker) IsComplete() bool { + ht.lock.RLock() + defer ht.lock.RUnlock() + + return ht.hookAttemptedCnt == ht.hookExecutedCnt } -// GetTracker gets the tracker inside HookTracker -func (ht *HookTracker) GetTracker() map[hookTrackerKey]hookTrackerVal { +// HooksErr returns hook execution errors +func (ht *HookTracker) HookErrs() []HookErrInfo { ht.lock.RLock() defer ht.lock.RUnlock() - return ht.tracker + return ht.hookErrs +} + +// MultiHookTrackers tracks all hooks' execution status for multiple backups/restores. +type MultiHookTracker struct { + lock *sync.RWMutex + // trackers is a map that uses the backup/restore name as the key and stores a HookTracker as value. + trackers map[string]*HookTracker +} + +// NewMultiHookTracker creates a multiHookTracker instance. +func NewMultiHookTracker() *MultiHookTracker { + return &MultiHookTracker{ + lock: &sync.RWMutex{}, + trackers: make(map[string]*HookTracker), + } +} + +// Add adds a backup/restore hook to the tracker +func (mht *MultiHookTracker) Add(name, podNamespace, podName, container, source, hookName string, hookPhase hookPhase) { + mht.lock.Lock() + defer mht.lock.Unlock() + + if _, ok := mht.trackers[name]; !ok { + mht.trackers[name] = NewHookTracker() + } + mht.trackers[name].Add(podNamespace, podName, container, source, hookName, hookPhase) +} + +// Record records a backup/restore hook execution status +func (mht *MultiHookTracker) Record(name, podNamespace, podName, container, source, hookName string, hookPhase hookPhase, hookFailed bool, hookErr error) error { + mht.lock.RLock() + defer mht.lock.RUnlock() + + var err error + if _, ok := mht.trackers[name]; ok { + err = mht.trackers[name].Record(podNamespace, podName, container, source, hookName, hookPhase, hookFailed, hookErr) + } else { + err = fmt.Errorf("the backup/restore not exist in hook tracker, backup/restore name: %s", name) + } + return err +} + +// Stat returns the number of attempted hooks and failed hooks for a particular backup/restore +func (mht *MultiHookTracker) Stat(name string) (hookAttemptedCnt int, hookFailedCnt int) { + mht.lock.RLock() + defer mht.lock.RUnlock() + + if _, ok := mht.trackers[name]; ok { + return mht.trackers[name].Stat() + } + return +} + +// Delete removes the hook data for a particular backup/restore +func (mht *MultiHookTracker) Delete(name string) { + mht.lock.Lock() + defer mht.lock.Unlock() + + delete(mht.trackers, name) +} + +// IsComplete returns whether the execution of all hooks for a particular backup/restore has finished or not +func (mht *MultiHookTracker) IsComplete(name string) bool { + mht.lock.RLock() + defer mht.lock.RUnlock() + + if _, ok := mht.trackers[name]; ok { + return mht.trackers[name].IsComplete() + } + return true +} + +// HooksErr returns hook execution errors for a particular backup/restore +func (mht *MultiHookTracker) HookErrs(name string) []HookErrInfo { + mht.lock.RLock() + defer mht.lock.RUnlock() + + if _, ok := mht.trackers[name]; ok { + return mht.trackers[name].HookErrs() + } + return nil } diff --git a/internal/hook/hook_tracker_test.go b/internal/hook/hook_tracker_test.go index dde7b85210..b0ea1911a9 100644 --- a/internal/hook/hook_tracker_test.go +++ b/internal/hook/hook_tracker_test.go @@ -17,6 +17,7 @@ limitations under the License. package hook import ( + "fmt" "testing" "github.com/stretchr/testify/assert" @@ -32,13 +33,13 @@ func TestNewHookTracker(t *testing.T) { func TestHookTracker_Add(t *testing.T) { tracker := NewHookTracker() - tracker.Add("ns1", "pod1", "container1", HookSourceAnnotation, "h1", PhasePre) + tracker.Add("ns1", "pod1", "container1", HookSourceAnnotation, "h1", "") - key := hookTrackerKey{ + key := hookKey{ podNamespace: "ns1", podName: "pod1", container: "container1", - hookPhase: PhasePre, + hookPhase: "", hookSource: HookSourceAnnotation, hookName: "h1", } @@ -49,44 +50,148 @@ func TestHookTracker_Add(t *testing.T) { func TestHookTracker_Record(t *testing.T) { tracker := NewHookTracker() - tracker.Add("ns1", "pod1", "container1", HookSourceAnnotation, "h1", PhasePre) - err := tracker.Record("ns1", "pod1", "container1", HookSourceAnnotation, "h1", PhasePre, true) + tracker.Add("ns1", "pod1", "container1", HookSourceAnnotation, "h1", "") + err := tracker.Record("ns1", "pod1", "container1", HookSourceAnnotation, "h1", "", true, fmt.Errorf("err")) - key := hookTrackerKey{ + key := hookKey{ podNamespace: "ns1", podName: "pod1", container: "container1", - hookPhase: PhasePre, + hookPhase: "", hookSource: HookSourceAnnotation, hookName: "h1", } info := tracker.tracker[key] assert.True(t, info.hookFailed) + assert.True(t, info.hookExecuted) assert.Nil(t, err) - err = tracker.Record("ns2", "pod2", "container1", HookSourceAnnotation, "h1", PhasePre, true) + err = tracker.Record("ns2", "pod2", "container1", HookSourceAnnotation, "h1", "", true, fmt.Errorf("err")) assert.NotNil(t, err) + + err = tracker.Record("ns1", "pod1", "container1", HookSourceAnnotation, "h1", "", false, nil) + assert.Nil(t, err) + assert.True(t, info.hookFailed) } func TestHookTracker_Stat(t *testing.T) { tracker := NewHookTracker() - tracker.Add("ns1", "pod1", "container1", HookSourceAnnotation, "h1", PhasePre) - tracker.Add("ns2", "pod2", "container1", HookSourceAnnotation, "h2", PhasePre) - tracker.Record("ns1", "pod1", "container1", HookSourceAnnotation, "h1", PhasePre, true) + tracker.Add("ns1", "pod1", "container1", HookSourceAnnotation, "h1", "") + tracker.Add("ns2", "pod2", "container1", HookSourceAnnotation, "h2", "") + tracker.Record("ns1", "pod1", "container1", HookSourceAnnotation, "h1", "", true, fmt.Errorf("err")) attempted, failed := tracker.Stat() - assert.Equal(t, 1, attempted) + assert.Equal(t, 2, attempted) assert.Equal(t, 1, failed) } -func TestHookTracker_Get(t *testing.T) { +func TestHookTracker_IsComplete(t *testing.T) { tracker := NewHookTracker() tracker.Add("ns1", "pod1", "container1", HookSourceAnnotation, "h1", PhasePre) + tracker.Record("ns1", "pod1", "container1", HookSourceAnnotation, "h1", PhasePre, true, fmt.Errorf("err")) + assert.True(t, tracker.IsComplete()) + + tracker.Add("ns1", "pod1", "container1", HookSourceAnnotation, "h1", "") + assert.False(t, tracker.IsComplete()) +} + +func TestHookTracker_HookErrs(t *testing.T) { + tracker := NewHookTracker() + tracker.Add("ns1", "pod1", "container1", HookSourceAnnotation, "h1", "") + tracker.Record("ns1", "pod1", "container1", HookSourceAnnotation, "h1", "", true, fmt.Errorf("err")) + + hookErrs := tracker.HookErrs() + assert.Len(t, hookErrs, 1) +} + +func TestMultiHookTracker_Add(t *testing.T) { + mht := NewMultiHookTracker() + + mht.Add("restore1", "ns1", "pod1", "container1", HookSourceAnnotation, "h1", "") + + key := hookKey{ + podNamespace: "ns1", + podName: "pod1", + container: "container1", + hookPhase: "", + hookSource: HookSourceAnnotation, + hookName: "h1", + } + + _, ok := mht.trackers["restore1"].tracker[key] + assert.True(t, ok) +} + +func TestMultiHookTracker_Record(t *testing.T) { + mht := NewMultiHookTracker() + mht.Add("restore1", "ns1", "pod1", "container1", HookSourceAnnotation, "h1", "") + err := mht.Record("restore1", "ns1", "pod1", "container1", HookSourceAnnotation, "h1", "", true, fmt.Errorf("err")) + + key := hookKey{ + podNamespace: "ns1", + podName: "pod1", + container: "container1", + hookPhase: "", + hookSource: HookSourceAnnotation, + hookName: "h1", + } + + info := mht.trackers["restore1"].tracker[key] + assert.True(t, info.hookFailed) + assert.True(t, info.hookExecuted) + assert.Nil(t, err) + + err = mht.Record("restore1", "ns2", "pod2", "container1", HookSourceAnnotation, "h1", "", true, fmt.Errorf("err")) + assert.NotNil(t, err) + + err = mht.Record("restore2", "ns2", "pod2", "container1", HookSourceAnnotation, "h1", "", true, fmt.Errorf("err")) + assert.NotNil(t, err) +} + +func TestMultiHookTracker_Stat(t *testing.T) { + mht := NewMultiHookTracker() + + mht.Add("restore1", "ns1", "pod1", "container1", HookSourceAnnotation, "h1", "") + mht.Add("restore1", "ns2", "pod2", "container1", HookSourceAnnotation, "h2", "") + mht.Record("restore1", "ns1", "pod1", "container1", HookSourceAnnotation, "h1", "", true, fmt.Errorf("err")) + mht.Record("restore1", "ns2", "pod2", "container1", HookSourceAnnotation, "h2", "", false, nil) + + attempted, failed := mht.Stat("restore1") + assert.Equal(t, 2, attempted) + assert.Equal(t, 1, failed) +} + +func TestMultiHookTracker_Delete(t *testing.T) { + mht := NewMultiHookTracker() + mht.Add("restore1", "ns1", "pod1", "container1", HookSourceAnnotation, "h1", "") + mht.Delete("restore1") + + _, ok := mht.trackers["restore1"] + assert.False(t, ok) +} + +func TestMultiHookTracker_IsComplete(t *testing.T) { + mht := NewMultiHookTracker() + mht.Add("backup1", "ns1", "pod1", "container1", HookSourceAnnotation, "h1", PhasePre) + mht.Record("backup1", "ns1", "pod1", "container1", HookSourceAnnotation, "h1", PhasePre, true, fmt.Errorf("err")) + assert.True(t, mht.IsComplete("backup1")) + + mht.Add("restore1", "ns1", "pod1", "container1", HookSourceAnnotation, "h1", "") + assert.False(t, mht.IsComplete("restore1")) + + assert.True(t, mht.IsComplete("restore2")) +} + +func TestMultiHookTracker_HookErrs(t *testing.T) { + mht := NewMultiHookTracker() + mht.Add("restore1", "ns1", "pod1", "container1", HookSourceAnnotation, "h1", "") + mht.Record("restore1", "ns1", "pod1", "container1", HookSourceAnnotation, "h1", "", true, fmt.Errorf("err")) - tr := tracker.GetTracker() - assert.NotNil(t, tr) + hookErrs := mht.HookErrs("restore1") + assert.Len(t, hookErrs, 1) - t.Logf("tracker :%+v", tr) + hookErrs2 := mht.HookErrs("restore2") + assert.Empty(t, hookErrs2) } diff --git a/internal/hook/item_hook_handler.go b/internal/hook/item_hook_handler.go index 8120386d60..65c47e56ec 100644 --- a/internal/hook/item_hook_handler.go +++ b/internal/hook/item_hook_handler.go @@ -239,7 +239,7 @@ func (h *DefaultItemHookHandler) HandleHooks( hookLog.WithError(errExec).Error("Error executing hook") hookFailed = true } - errTracker := hookTracker.Record(namespace, name, hookFromAnnotations.Container, HookSourceAnnotation, "", phase, hookFailed) + errTracker := hookTracker.Record(namespace, name, hookFromAnnotations.Container, HookSourceAnnotation, "", phase, hookFailed, errExec) if errTracker != nil { hookLog.WithError(errTracker).Warn("Error recording the hook in hook tracker") } @@ -291,7 +291,7 @@ func (h *DefaultItemHookHandler) HandleHooks( modeFailError = err } } - errTracker := hookTracker.Record(namespace, name, hook.Exec.Container, HookSourceSpec, resourceHook.Name, phase, hookFailed) + errTracker := hookTracker.Record(namespace, name, hook.Exec.Container, HookSourceSpec, resourceHook.Name, phase, hookFailed, err) if errTracker != nil { hookLog.WithError(errTracker).Warn("Error recording the hook in hook tracker") } @@ -540,10 +540,11 @@ type PodExecRestoreHook struct { // container name. If an exec hook is defined in annotation that is used, else applicable exec // hooks from the restore resource are accumulated. func GroupRestoreExecHooks( + restoreName string, resourceRestoreHooks []ResourceRestoreHook, pod *corev1api.Pod, log logrus.FieldLogger, - hookTrack *HookTracker, + hookTrack *MultiHookTracker, ) (map[string][]PodExecRestoreHook, error) { byContainer := map[string][]PodExecRestoreHook{} @@ -560,7 +561,7 @@ func GroupRestoreExecHooks( if hookFromAnnotation.Container == "" { hookFromAnnotation.Container = pod.Spec.Containers[0].Name } - hookTrack.Add(metadata.GetNamespace(), metadata.GetName(), hookFromAnnotation.Container, HookSourceAnnotation, "", hookPhase("")) + hookTrack.Add(restoreName, metadata.GetNamespace(), metadata.GetName(), hookFromAnnotation.Container, HookSourceAnnotation, "", hookPhase("")) byContainer[hookFromAnnotation.Container] = []PodExecRestoreHook{ { HookName: "", @@ -595,7 +596,7 @@ func GroupRestoreExecHooks( if named.Hook.Container == "" { named.Hook.Container = pod.Spec.Containers[0].Name } - hookTrack.Add(metadata.GetNamespace(), metadata.GetName(), named.Hook.Container, HookSourceSpec, rrh.Name, hookPhase("")) + hookTrack.Add(restoreName, metadata.GetNamespace(), metadata.GetName(), named.Hook.Container, HookSourceSpec, rrh.Name, hookPhase("")) byContainer[named.Hook.Container] = append(byContainer[named.Hook.Container], named) } } diff --git a/internal/hook/item_hook_handler_test.go b/internal/hook/item_hook_handler_test.go index 6ab9b1528f..f60110a5de 100644 --- a/internal/hook/item_hook_handler_test.go +++ b/internal/hook/item_hook_handler_test.go @@ -1195,10 +1195,10 @@ func TestGroupRestoreExecHooks(t *testing.T) { }, } - hookTracker := NewHookTracker() + hookTracker := NewMultiHookTracker() for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - actual, err := GroupRestoreExecHooks(tc.resourceRestoreHooks, tc.pod, velerotest.NewLogger(), hookTracker) + actual, err := GroupRestoreExecHooks("restore1", tc.resourceRestoreHooks, tc.pod, velerotest.NewLogger(), hookTracker) assert.Nil(t, err) assert.Equal(t, tc.expected, actual) }) @@ -2108,7 +2108,7 @@ func TestBackupHookTracker(t *testing.T) { phase: PhasePre, groupResource: "pods", hookTracker: NewHookTracker(), - expectedHookAttempted: 3, + expectedHookAttempted: 4, expectedHookFailed: 2, pods: []podWithHook{ { @@ -2364,14 +2364,14 @@ func TestRestoreHookTrackerAdd(t *testing.T) { name string resourceRestoreHooks []ResourceRestoreHook pod *corev1api.Pod - hookTracker *HookTracker + hookTracker *MultiHookTracker expectedCnt int }{ { name: "neither spec hooks nor annotations hooks are set", resourceRestoreHooks: nil, pod: builder.ForPod("default", "my-pod").Result(), - hookTracker: NewHookTracker(), + hookTracker: NewMultiHookTracker(), expectedCnt: 0, }, { @@ -2390,7 +2390,7 @@ func TestRestoreHookTrackerAdd(t *testing.T) { Name: "container1", }). Result(), - hookTracker: NewHookTracker(), + hookTracker: NewMultiHookTracker(), expectedCnt: 1, }, { @@ -2428,7 +2428,7 @@ func TestRestoreHookTrackerAdd(t *testing.T) { Name: "container2", }). Result(), - hookTracker: NewHookTracker(), + hookTracker: NewMultiHookTracker(), expectedCnt: 2, }, { @@ -2463,15 +2463,18 @@ func TestRestoreHookTrackerAdd(t *testing.T) { Name: "container1", }). Result(), - hookTracker: NewHookTracker(), + hookTracker: NewMultiHookTracker(), expectedCnt: 1, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - _, _ = GroupRestoreExecHooks(tc.resourceRestoreHooks, tc.pod, velerotest.NewLogger(), tc.hookTracker) - tracker := tc.hookTracker.GetTracker() + _, _ = GroupRestoreExecHooks("restore1", tc.resourceRestoreHooks, tc.pod, velerotest.NewLogger(), tc.hookTracker) + if _, ok := tc.hookTracker.trackers["restore1"]; !ok { + return + } + tracker := tc.hookTracker.trackers["restore1"].tracker assert.Len(t, tracker, tc.expectedCnt) }) } diff --git a/internal/hook/wait_exec_hook_handler.go b/internal/hook/wait_exec_hook_handler.go index 1ca2eea8ab..fb87a7911c 100644 --- a/internal/hook/wait_exec_hook_handler.go +++ b/internal/hook/wait_exec_hook_handler.go @@ -39,7 +39,8 @@ type WaitExecHookHandler interface { log logrus.FieldLogger, pod *v1.Pod, byContainer map[string][]PodExecRestoreHook, - hookTrack *HookTracker, + multiHookTracker *MultiHookTracker, + restoreName string, ) []error } @@ -74,7 +75,8 @@ func (e *DefaultWaitExecHookHandler) HandleHooks( log logrus.FieldLogger, pod *v1.Pod, byContainer map[string][]PodExecRestoreHook, - hookTracker *HookTracker, + multiHookTracker *MultiHookTracker, + restoreName string, ) []error { if pod == nil { return nil @@ -167,7 +169,7 @@ func (e *DefaultWaitExecHookHandler) HandleHooks( hookLog.Error(err) errors = append(errors, err) - errTracker := hookTracker.Record(newPod.Namespace, newPod.Name, hook.Hook.Container, hook.HookSource, hook.HookName, hookPhase(""), true) + errTracker := multiHookTracker.Record(restoreName, newPod.Namespace, newPod.Name, hook.Hook.Container, hook.HookSource, hook.HookName, hookPhase(""), true, err) if errTracker != nil { hookLog.WithError(errTracker).Warn("Error recording the hook in hook tracker") } @@ -193,7 +195,7 @@ func (e *DefaultWaitExecHookHandler) HandleHooks( hookFailed = true } - errTracker := hookTracker.Record(newPod.Namespace, newPod.Name, hook.Hook.Container, hook.HookSource, hook.HookName, hookPhase(""), hookFailed) + errTracker := multiHookTracker.Record(restoreName, newPod.Namespace, newPod.Name, hook.Hook.Container, hook.HookSource, hook.HookName, hookPhase(""), hookFailed, hookErr) if errTracker != nil { hookLog.WithError(errTracker).Warn("Error recording the hook in hook tracker") } @@ -245,7 +247,7 @@ func (e *DefaultWaitExecHookHandler) HandleHooks( }, ) - errTracker := hookTracker.Record(pod.Namespace, pod.Name, hook.Hook.Container, hook.HookSource, hook.HookName, hookPhase(""), true) + errTracker := multiHookTracker.Record(restoreName, pod.Namespace, pod.Name, hook.Hook.Container, hook.HookSource, hook.HookName, hookPhase(""), true, err) if errTracker != nil { hookLog.WithError(errTracker).Warn("Error recording the hook in hook tracker") } diff --git a/internal/hook/wait_exec_hook_handler_test.go b/internal/hook/wait_exec_hook_handler_test.go index 2f8d7ad7ac..e615d1307e 100644 --- a/internal/hook/wait_exec_hook_handler_test.go +++ b/internal/hook/wait_exec_hook_handler_test.go @@ -743,8 +743,8 @@ func TestWaitExecHandleHooks(t *testing.T) { defer ctxCancel() } - hookTracker := NewHookTracker() - errs := h.HandleHooks(ctx, velerotest.NewLogger(), test.initialPod, test.byContainer, hookTracker) + hookTracker := NewMultiHookTracker() + errs := h.HandleHooks(ctx, velerotest.NewLogger(), test.initialPod, test.byContainer, hookTracker, "restore1") // for i, ee := range test.expectedErrors { require.Len(t, errs, len(test.expectedErrors)) @@ -1011,15 +1011,18 @@ func TestRestoreHookTrackerUpdate(t *testing.T) { pod *v1.Pod } - hookTracker1 := NewHookTracker() - hookTracker1.Add("default", "my-pod", "container1", HookSourceAnnotation, "", hookPhase("")) + hookTracker1 := NewMultiHookTracker() + hookTracker1.Add("restore1", "default", "my-pod", "container1", HookSourceAnnotation, "", hookPhase("")) - hookTracker2 := NewHookTracker() - hookTracker2.Add("default", "my-pod", "container1", HookSourceSpec, "my-hook-1", hookPhase("")) + hookTracker2 := NewMultiHookTracker() + hookTracker2.Add("restore1", "default", "my-pod", "container1", HookSourceSpec, "my-hook-1", hookPhase("")) - hookTracker3 := NewHookTracker() - hookTracker3.Add("default", "my-pod", "container1", HookSourceSpec, "my-hook-1", hookPhase("")) - hookTracker3.Add("default", "my-pod", "container2", HookSourceSpec, "my-hook-2", hookPhase("")) + hookTracker3 := NewMultiHookTracker() + hookTracker3.Add("restore1", "default", "my-pod", "container1", HookSourceSpec, "my-hook-1", hookPhase("")) + hookTracker3.Add("restore1", "default", "my-pod", "container2", HookSourceSpec, "my-hook-2", hookPhase("")) + + hookTracker4 := NewMultiHookTracker() + hookTracker4.Add("restore1", "default", "my-pod", "container1", HookSourceSpec, "my-hook-1", hookPhase("")) tests1 := []struct { name string @@ -1027,7 +1030,7 @@ func TestRestoreHookTrackerUpdate(t *testing.T) { groupResource string byContainer map[string][]PodExecRestoreHook expectedExecutions []expectedExecution - hookTracker *HookTracker + hookTracker *MultiHookTracker expectedFailed int }{ { @@ -1159,7 +1162,7 @@ func TestRestoreHookTrackerUpdate(t *testing.T) { }, }, }, - hookTracker: hookTracker2, + hookTracker: hookTracker4, expectedFailed: 1, }, { @@ -1243,7 +1246,7 @@ func TestRestoreHookTrackerUpdate(t *testing.T) { }, }, }, - hookTracker: NewHookTracker(), + hookTracker: NewMultiHookTracker(), expectedFailed: 0, }, } @@ -1271,8 +1274,8 @@ func TestRestoreHookTrackerUpdate(t *testing.T) { } ctx := context.Background() - _ = h.HandleHooks(ctx, velerotest.NewLogger(), test.initialPod, test.byContainer, test.hookTracker) - _, actualFailed := test.hookTracker.Stat() + _ = h.HandleHooks(ctx, velerotest.NewLogger(), test.initialPod, test.byContainer, test.hookTracker, "restore1") + _, actualFailed := test.hookTracker.Stat("restore1") assert.Equal(t, test.expectedFailed, actualFailed) }) } diff --git a/pkg/backup/backup.go b/pkg/backup/backup.go index cc2aefe900..4e88411913 100644 --- a/pkg/backup/backup.go +++ b/pkg/backup/backup.go @@ -467,7 +467,7 @@ func (kb *kubernetesBackupper) BackupWithResolvers( updated.Status.HookStatus = &velerov1api.HookStatus{} } updated.Status.HookStatus.HooksAttempted, updated.Status.HookStatus.HooksFailed = itemBackupper.hookTracker.Stat() - log.Infof("hookTracker: %+v, hookAttempted: %d, hookFailed: %d", itemBackupper.hookTracker.GetTracker(), updated.Status.HookStatus.HooksAttempted, updated.Status.HookStatus.HooksFailed) + log.Debugf("hookAttempted: %d, hookFailed: %d", updated.Status.HookStatus.HooksAttempted, updated.Status.HookStatus.HooksFailed) if err := kube.PatchResource(backupRequest.Backup, updated, kb.kbClient); err != nil { log.WithError(errors.WithStack((err))).Warn("Got error trying to update backup's status.progress and hook status") diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index c98dd0f35e..4aa6a3ade9 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -54,6 +54,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" "github.com/vmware-tanzu/velero/internal/credentials" + "github.com/vmware-tanzu/velero/internal/hook" "github.com/vmware-tanzu/velero/internal/storage" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" @@ -943,6 +944,8 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string s.logger.Fatal(err, "fail to get controller-runtime informer from manager for PVR") } + multiHookTracker := hook.NewMultiHookTracker() + if _, ok := enabledRuntimeControllers[controller.Restore]; ok { restorer, err := restore.NewKubernetesRestorer( s.discoveryHelper, @@ -965,6 +968,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string s.kubeClient.CoreV1().RESTClient(), s.credentialFileStore, s.mgr.GetClient(), + multiHookTracker, ) cmd.CheckError(err) @@ -1017,6 +1021,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string backupStoreGetter, s.metrics, s.crClient, + multiHookTracker, ).SetupWithManager(s.mgr); err != nil { s.logger.Fatal(err, "unable to create controller", "controller", controller.RestoreFinalizer) } diff --git a/pkg/controller/restore_finalizer_controller.go b/pkg/controller/restore_finalizer_controller.go index 899fb0b36a..d9aaaa30a6 100644 --- a/pkg/controller/restore_finalizer_controller.go +++ b/pkg/controller/restore_finalizer_controller.go @@ -33,6 +33,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/utils/clock" + "github.com/vmware-tanzu/velero/internal/hook" "github.com/vmware-tanzu/velero/internal/volume" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" "github.com/vmware-tanzu/velero/pkg/metrics" @@ -55,6 +56,7 @@ type restoreFinalizerReconciler struct { metrics *metrics.ServerMetrics clock clock.WithTickerAndDelayedExecution crClient client.Client + multiHookTracker *hook.MultiHookTracker } func NewRestoreFinalizerReconciler( @@ -65,6 +67,7 @@ func NewRestoreFinalizerReconciler( backupStoreGetter persistence.ObjectBackupStoreGetter, metrics *metrics.ServerMetrics, crClient client.Client, + multiHookTracker *hook.MultiHookTracker, ) *restoreFinalizerReconciler { return &restoreFinalizerReconciler{ Client: client, @@ -75,6 +78,7 @@ func NewRestoreFinalizerReconciler( metrics: metrics, clock: &clock.RealClock{}, crClient: crClient, + multiHookTracker: multiHookTracker, } } @@ -151,11 +155,12 @@ func (r *restoreFinalizerReconciler) Reconcile(ctx context.Context, req ctrl.Req restoredPVCList := volume.RestoredPVCFromRestoredResourceList(restoredResourceList) finalizerCtx := &finalizerContext{ - logger: log, - restore: restore, - crClient: r.crClient, - volumeInfo: volumeInfo, - restoredPVCList: restoredPVCList, + logger: log, + restore: restore, + crClient: r.crClient, + volumeInfo: volumeInfo, + restoredPVCList: restoredPVCList, + multiHookTracker: r.multiHookTracker, } warnings, errs := finalizerCtx.execute() @@ -233,11 +238,12 @@ func (r *restoreFinalizerReconciler) finishProcessing(restorePhase velerov1api.R // finalizerContext includes all the dependencies required by finalization tasks and // a function execute() to orderly implement task logic. type finalizerContext struct { - logger logrus.FieldLogger - restore *velerov1api.Restore - crClient client.Client - volumeInfo []*volume.BackupVolumeInfo - restoredPVCList map[string]struct{} + logger logrus.FieldLogger + restore *velerov1api.Restore + crClient client.Client + volumeInfo []*volume.BackupVolumeInfo + restoredPVCList map[string]struct{} + multiHookTracker *hook.MultiHookTracker } func (ctx *finalizerContext) execute() (results.Result, results.Result) { //nolint:unparam //temporarily ignore the lint report: result 0 is always nil (unparam) @@ -247,6 +253,9 @@ func (ctx *finalizerContext) execute() (results.Result, results.Result) { //noli pdpErrs := ctx.patchDynamicPVWithVolumeInfo() errs.Merge(&pdpErrs) + rehErrs := ctx.WaitRestoreExecHook() + errs.Merge(&rehErrs) + return warnings, errs } @@ -373,3 +382,45 @@ func needPatch(newPV *v1.PersistentVolume, pvInfo *volume.PVInfo) bool { return false } + +// WaitRestoreExecHook waits for restore exec hooks to finish then update the hook execution results +func (ctx *finalizerContext) WaitRestoreExecHook() (errs results.Result) { + log := ctx.logger.WithField("restore", ctx.restore.Name) + log.Info("Waiting for restore exec hooks starts") + + // wait for restore exec hooks to finish + err := wait.PollUntilContextCancel(context.Background(), 1*time.Second, true, func(context.Context) (bool, error) { + log.Debug("Checking the progress of hooks execution") + if ctx.multiHookTracker.IsComplete(ctx.restore.Name) { + return true, nil + } + return false, nil + }) + if err != nil { + errs.Add(ctx.restore.Namespace, err) + return errs + } + log.Info("Done waiting for restore exec hooks starts") + + for _, ei := range ctx.multiHookTracker.HookErrs(ctx.restore.Name) { + errs.Add(ei.Namespace, ei.Err) + } + + // update hooks execution status + updated := ctx.restore.DeepCopy() + if updated.Status.HookStatus == nil { + updated.Status.HookStatus = &velerov1api.HookStatus{} + } + updated.Status.HookStatus.HooksAttempted, updated.Status.HookStatus.HooksFailed = ctx.multiHookTracker.Stat(ctx.restore.Name) + log.Debugf("hookAttempted: %d, hookFailed: %d", updated.Status.HookStatus.HooksAttempted, updated.Status.HookStatus.HooksFailed) + + if err := kubeutil.PatchResource(ctx.restore, updated, ctx.crClient); err != nil { + log.WithError(errors.WithStack((err))).Error("Updating restore status") + errs.Add(ctx.restore.Namespace, err) + } + + // delete the hook data for this restore + ctx.multiHookTracker.Delete(ctx.restore.Name) + + return errs +} diff --git a/pkg/controller/restore_finalizer_controller_test.go b/pkg/controller/restore_finalizer_controller_test.go index c72ab49c43..9ca5658493 100644 --- a/pkg/controller/restore_finalizer_controller_test.go +++ b/pkg/controller/restore_finalizer_controller_test.go @@ -18,6 +18,7 @@ package controller import ( "context" + "fmt" "testing" "time" @@ -34,6 +35,7 @@ import ( corev1api "k8s.io/api/core/v1" crclient "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/vmware-tanzu/velero/internal/hook" "github.com/vmware-tanzu/velero/internal/volume" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" "github.com/vmware-tanzu/velero/pkg/builder" @@ -135,6 +137,7 @@ func TestRestoreFinalizerReconcile(t *testing.T) { NewFakeSingleObjectBackupStoreGetter(backupStore), metrics.NewServerMetrics(), fakeClient, + hook.NewMultiHookTracker(), ) r.clock = testclocks.NewFakeClock(now) @@ -196,6 +199,7 @@ func TestUpdateResult(t *testing.T) { NewFakeSingleObjectBackupStoreGetter(backupStore), metrics.NewServerMetrics(), fakeClient, + hook.NewMultiHookTracker(), ) restore := builder.ForRestore(velerov1api.DefaultNamespace, "restore-1").Result() res := map[string]results.Result{"warnings": {}, "errors": {}} @@ -454,3 +458,99 @@ func TestPatchDynamicPVWithVolumeInfo(t *testing.T) { } } } + +func TestWaitRestoreExecHook(t *testing.T) { + hookTracker1 := hook.NewMultiHookTracker() + restoreName1 := "restore1" + + hookTracker2 := hook.NewMultiHookTracker() + restoreName2 := "restore2" + hookTracker2.Add(restoreName2, "ns", "pod", "con1", "s1", "h1", "") + hookTracker2.Record(restoreName2, "ns", "pod", "con1", "s1", "h1", "", false, nil) + + hookTracker3 := hook.NewMultiHookTracker() + restoreName3 := "restore3" + podNs, podName, container, source, hookName := "ns", "pod", "con1", "s1", "h1" + hookFailed, hookErr := true, fmt.Errorf("hook failed") + hookTracker3.Add(restoreName3, podNs, podName, container, source, hookName, hook.PhasePre) + + tests := []struct { + name string + hookTracker *hook.MultiHookTracker + restore *velerov1api.Restore + expectedHooksAttempted int + expectedHooksFailed int + expectedHookErrs int + waitSec int + podName string + podNs string + Container string + Source string + hookName string + hookFailed bool + hookErr error + }{ + { + name: "no restore exec hooks", + hookTracker: hookTracker1, + restore: builder.ForRestore(velerov1api.DefaultNamespace, restoreName1).Result(), + expectedHooksAttempted: 0, + expectedHooksFailed: 0, + expectedHookErrs: 0, + }, + { + name: "1 restore exec hook having been executed", + hookTracker: hookTracker2, + restore: builder.ForRestore(velerov1api.DefaultNamespace, restoreName2).Result(), + expectedHooksAttempted: 1, + expectedHooksFailed: 0, + expectedHookErrs: 0, + }, + { + name: "1 restore exec hook to be executed", + hookTracker: hookTracker3, + restore: builder.ForRestore(velerov1api.DefaultNamespace, restoreName3).Result(), + waitSec: 2, + expectedHooksAttempted: 1, + expectedHooksFailed: 1, + expectedHookErrs: 1, + podName: podName, + podNs: podNs, + Container: container, + Source: source, + hookName: hookName, + hookFailed: hookFailed, + hookErr: hookErr, + }, + } + + for _, tc := range tests { + var ( + fakeClient = velerotest.NewFakeControllerRuntimeClientBuilder(t).Build() + logger = velerotest.NewLogger() + ) + ctx := &finalizerContext{ + logger: logger, + crClient: fakeClient, + restore: tc.restore, + multiHookTracker: tc.hookTracker, + } + require.NoError(t, ctx.crClient.Create(context.Background(), tc.restore)) + + if tc.waitSec > 0 { + go func() { + time.Sleep(time.Second * time.Duration(tc.waitSec)) + tc.hookTracker.Record(tc.restore.Name, tc.podNs, tc.podName, tc.Container, tc.Source, tc.hookName, hook.PhasePre, tc.hookFailed, tc.hookErr) + }() + } + + errs := ctx.WaitRestoreExecHook() + assert.Len(t, errs.Namespaces, tc.expectedHookErrs) + + updated := &velerov1api.Restore{} + err := ctx.crClient.Get(context.Background(), crclient.ObjectKey{Namespace: velerov1api.DefaultNamespace, Name: tc.restore.Name}, updated) + assert.NoError(t, err) + assert.Equal(t, tc.expectedHooksAttempted, updated.Status.HookStatus.HooksAttempted) + assert.Equal(t, tc.expectedHooksFailed, updated.Status.HookStatus.HooksFailed) + } +} diff --git a/pkg/restore/restore.go b/pkg/restore/restore.go index d2f383079e..b08f59a6ba 100644 --- a/pkg/restore/restore.go +++ b/pkg/restore/restore.go @@ -114,6 +114,7 @@ type kubernetesRestorer struct { podGetter cache.Getter credentialFileStore credentials.FileStore kbClient crclient.Client + multiHookTracker *hook.MultiHookTracker } // NewKubernetesRestorer creates a new kubernetesRestorer. @@ -131,6 +132,7 @@ func NewKubernetesRestorer( podGetter cache.Getter, credentialStore credentials.FileStore, kbClient crclient.Client, + multiHookTracker *hook.MultiHookTracker, ) (Restorer, error) { return &kubernetesRestorer{ discoveryHelper: discoveryHelper, @@ -155,6 +157,7 @@ func NewKubernetesRestorer( podGetter: podGetter, credentialFileStore: credentialStore, kbClient: kbClient, + multiHookTracker: multiHookTracker, }, nil } @@ -252,11 +255,6 @@ func (kr *kubernetesRestorer) RestoreWithResolvers( } } - resourceRestoreHooks, err := hook.GetRestoreHooksFromSpec(&req.Restore.Spec.Hooks) - if err != nil { - return results.Result{}, results.Result{Velero: []string{err.Error()}} - } - hooksCtx, hooksCancelFunc := go_context.WithCancel(go_context.Background()) waitExecHookHandler := &hook.DefaultWaitExecHookHandler{ PodCommandExecutor: kr.podCommandExecutor, ListWatchFactory: &hook.DefaultListWatchFactory{ @@ -264,6 +262,11 @@ func (kr *kubernetesRestorer) RestoreWithResolvers( }, } + hooksWaitExecutor, err := newHooksWaitExecutor(req.Restore, waitExecHookHandler) + if err != nil { + return results.Result{}, results.Result{Velero: []string{err.Error()}} + } + pvRestorer := &pvRestorer{ logger: req.Log, backup: req.Backup, @@ -310,18 +313,14 @@ func (kr *kubernetesRestorer) RestoreWithResolvers( pvRenamer: kr.pvRenamer, discoveryHelper: kr.discoveryHelper, resourcePriorities: kr.resourcePriorities, - resourceRestoreHooks: resourceRestoreHooks, - hooksErrs: make(chan hook.HookErrInfo), - waitExecHookHandler: waitExecHookHandler, - hooksContext: hooksCtx, - hooksCancelFunc: hooksCancelFunc, kbClient: kr.kbClient, itemOperationsList: req.GetItemOperationsList(), resourceModifiers: req.ResourceModifiers, disableInformerCache: req.DisableInformerCache, - hookTracker: hook.NewHookTracker(), + multiHookTracker: kr.multiHookTracker, backupVolumeInfoMap: req.BackupVolumeInfoMap, restoreVolumeInfoTracker: req.RestoreVolumeInfoTracker, + hooksWaitExecutor: hooksWaitExecutor, } return restoreCtx.execute() @@ -362,19 +361,14 @@ type restoreContext struct { pvRenamer func(string) (string, error) discoveryHelper discovery.Helper resourcePriorities Priorities - hooksWaitGroup sync.WaitGroup - hooksErrs chan hook.HookErrInfo - resourceRestoreHooks []hook.ResourceRestoreHook - waitExecHookHandler hook.WaitExecHookHandler - hooksContext go_context.Context - hooksCancelFunc go_context.CancelFunc kbClient crclient.Client itemOperationsList *[]*itemoperation.RestoreOperation resourceModifiers *resourcemodifiers.ResourceModifiers disableInformerCache bool - hookTracker *hook.HookTracker + multiHookTracker *hook.MultiHookTracker backupVolumeInfoMap map[string]volume.BackupVolumeInfo restoreVolumeInfoTracker *volume.RestoreVolumeInfoTracker + hooksWaitExecutor *hooksWaitExecutor } type resourceClientKey struct { @@ -650,6 +644,12 @@ func (ctx *restoreContext) execute() (results.Result, results.Result) { updated.Status.Progress.TotalItems = len(ctx.restoredItems) updated.Status.Progress.ItemsRestored = len(ctx.restoredItems) + // patch the restore + err = kube.PatchResource(ctx.restore, updated, ctx.kbClient) + if err != nil { + ctx.log.WithError(errors.WithStack((err))).Warn("Updating restore status") + } + // Wait for all of the pod volume restore goroutines to be done, which is // only possible once all of their errors have been received by the loop // below, then close the podVolumeErrs channel so the loop terminates. @@ -672,31 +672,6 @@ func (ctx *restoreContext) execute() (results.Result, results.Result) { } ctx.log.Info("Done waiting for all pod volume restores to complete") - // Wait for all post-restore exec hooks with same logic as pod volume wait above. - go func() { - ctx.log.Info("Waiting for all post-restore-exec hooks to complete") - - ctx.hooksWaitGroup.Wait() - close(ctx.hooksErrs) - }() - for errInfo := range ctx.hooksErrs { - errs.Add(errInfo.Namespace, errInfo.Err) - } - ctx.log.Info("Done waiting for all post-restore exec hooks to complete") - - // update hooks execution status - if updated.Status.HookStatus == nil { - updated.Status.HookStatus = &velerov1api.HookStatus{} - } - updated.Status.HookStatus.HooksAttempted, updated.Status.HookStatus.HooksFailed = ctx.hookTracker.Stat() - ctx.log.Infof("hookTracker: %+v, hookAttempted: %d, hookFailed: %d", ctx.hookTracker.GetTracker(), updated.Status.HookStatus.HooksAttempted, updated.Status.HookStatus.HooksFailed) - - // patch the restore status - err = kube.PatchResource(ctx.restore, updated, ctx.kbClient) - if err != nil { - ctx.log.WithError(errors.WithStack((err))).Warn("Updating restore status") - } - return warnings, errs } @@ -1730,8 +1705,24 @@ func (ctx *restoreContext) restoreItem(obj *unstructured.Unstructured, groupReso } } + // Asynchronously executes restore exec hooks if any + // Velero will wait for all the asynchronous hook operations to finish in finalizing phase, using hook tracker to track the execution progress. if groupResource == kuberesource.Pods { - ctx.waitExec(createdObj) + pod := new(v1.Pod) + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(createdObj.UnstructuredContent(), &pod); err != nil { + ctx.log.Errorf("error converting pod %s: %v", kube.NamespaceAndName(obj), err) + errs.Add(namespace, err) + return warnings, errs, itemExists + } + + execHooksByContainer, err := ctx.hooksWaitExecutor.groupHooks(ctx.restore.Name, pod, ctx.multiHookTracker) + if err != nil { + ctx.log.Errorf("error grouping hooks from pod %s: %v", kube.NamespaceAndName(obj), err) + errs.Add(namespace, err) + return warnings, errs, itemExists + } + + ctx.hooksWaitExecutor.exec(execHooksByContainer, pod, ctx.multiHookTracker, ctx.restore.Name) } // Wait for a CRD to be available for instantiating resources @@ -1898,41 +1889,48 @@ func restorePodVolumeBackups(ctx *restoreContext, createdObj *unstructured.Unstr } } -// waitExec executes hooks in a restored pod's containers when they become ready. -func (ctx *restoreContext) waitExec(createdObj *unstructured.Unstructured) { - ctx.hooksWaitGroup.Add(1) - go func() { - // Done() will only be called after all errors have been successfully sent - // on the ctx.podVolumeErrs channel. - defer ctx.hooksWaitGroup.Done() +// hooksWaitExecutor is used to collect necessary fields that are required to asynchronously execute restore exec hooks +// note that fields are shared across different pods within a specific restore +// and separate hooksWaitExecutors instance will be created for different restores without interfering with each other. +type hooksWaitExecutor struct { + log logrus.FieldLogger + hooksContext go_context.Context + hooksCancelFunc go_context.CancelFunc + resourceRestoreHooks []hook.ResourceRestoreHook + waitExecHookHandler hook.WaitExecHookHandler +} - podNs := createdObj.GetNamespace() - pod := new(v1.Pod) - if err := runtime.DefaultUnstructuredConverter.FromUnstructured(createdObj.UnstructuredContent(), &pod); err != nil { - ctx.log.WithError(err).Error("error converting unstructured pod") - ctx.hooksErrs <- hook.HookErrInfo{Namespace: podNs, Err: err} - return - } - execHooksByContainer, err := hook.GroupRestoreExecHooks( - ctx.resourceRestoreHooks, - pod, - ctx.log, - ctx.hookTracker, - ) - if err != nil { - ctx.log.WithError(err).Errorf("error getting exec hooks for pod %s/%s", pod.Namespace, pod.Name) - ctx.hooksErrs <- hook.HookErrInfo{Namespace: podNs, Err: err} - return - } +func newHooksWaitExecutor(restore *velerov1api.Restore, waitExecHookHandler hook.WaitExecHookHandler) (*hooksWaitExecutor, error) { + resourceRestoreHooks, err := hook.GetRestoreHooksFromSpec(&restore.Spec.Hooks) + if err != nil { + return nil, err + } + hooksCtx, hooksCancelFunc := go_context.WithCancel(go_context.Background()) + hwe := &hooksWaitExecutor{ + log: logrus.WithField("restore", restore.Name), + hooksContext: hooksCtx, + hooksCancelFunc: hooksCancelFunc, + resourceRestoreHooks: resourceRestoreHooks, + waitExecHookHandler: waitExecHookHandler, + } + return hwe, nil +} - if errs := ctx.waitExecHookHandler.HandleHooks(ctx.hooksContext, ctx.log, pod, execHooksByContainer, ctx.hookTracker); len(errs) > 0 { - ctx.log.WithError(kubeerrs.NewAggregate(errs)).Error("unable to successfully execute post-restore hooks") - ctx.hooksCancelFunc() +// groupHooks returns a list of hooks to be executed in a pod grouped bycontainer name. +func (hwe *hooksWaitExecutor) groupHooks(restoreName string, pod *v1.Pod, multiHookTracker *hook.MultiHookTracker) (map[string][]hook.PodExecRestoreHook, error) { + execHooksByContainer, err := hook.GroupRestoreExecHooks(restoreName, hwe.resourceRestoreHooks, pod, hwe.log, multiHookTracker) + return execHooksByContainer, err +} - for _, err := range errs { - // Errors are already logged in the HandleHooks method. - ctx.hooksErrs <- hook.HookErrInfo{Namespace: podNs, Err: err} - } +// exec asynchronously executes hooks in a restored pod's containers when they become ready. +// Goroutine within this function will continue running until the hook executions are complete. +// Velero will wait for goroutine to finish in finalizing phase, using hook tracker to track the progress. +// To optimize memory usage, ensure that the variables used in this function are kept to a minimum to prevent unnecessary retention in memory. +func (hwe *hooksWaitExecutor) exec(execHooksByContainer map[string][]hook.PodExecRestoreHook, pod *v1.Pod, multiHookTracker *hook.MultiHookTracker, restoreName string) { + go func() { + if errs := hwe.waitExecHookHandler.HandleHooks(hwe.hooksContext, hwe.log, pod, execHooksByContainer, multiHookTracker, restoreName); len(errs) > 0 { + hwe.log.WithError(kubeerrs.NewAggregate(errs)).Error("unable to successfully execute post-restore hooks") + hwe.hooksCancelFunc() } }() }