diff --git a/changelogs/unreleased/7571-ywk253100 b/changelogs/unreleased/7571-ywk253100 new file mode 100644 index 0000000000..4ba925a052 --- /dev/null +++ b/changelogs/unreleased/7571-ywk253100 @@ -0,0 +1 @@ +Improve the concurrency for PVBs in different pods \ No newline at end of file diff --git a/pkg/backup/backup.go b/pkg/backup/backup.go index 67e20acb0e..bd79eefed2 100644 --- a/pkg/backup/backup.go +++ b/pkg/backup/backup.go @@ -419,6 +419,9 @@ func (kb *kubernetesBackupper) BackupWithResolvers(log logrus.FieldLogger, } } + processedPVBs := itemBackupper.podVolumeBackupper.WaitAllPodVolumesProcessed(log) + backupRequest.PodVolumeBackups = append(backupRequest.PodVolumeBackups, processedPVBs...) + // do a final update on progress since we may have just added some CRDs and may not have updated // for the last few processed items. updated = backupRequest.Backup.DeepCopy() diff --git a/pkg/backup/backup_test.go b/pkg/backup/backup_test.go index 0ceb0572a6..1de37947fe 100644 --- a/pkg/backup/backup_test.go +++ b/pkg/backup/backup_test.go @@ -3054,7 +3054,9 @@ func (f *fakePodVolumeBackupperFactory) NewBackupper(context.Context, *velerov1. return &fakePodVolumeBackupper{}, nil } -type fakePodVolumeBackupper struct{} +type fakePodVolumeBackupper struct { + pvbs []*velerov1.PodVolumeBackup +} // BackupPodVolumes returns one pod volume backup per entry in volumes, with namespace "velero" // and name "pvb---". @@ -3072,9 +3074,15 @@ func (b *fakePodVolumeBackupper) BackupPodVolumes(backup *velerov1.Backup, pod * res = append(res, pvb) } + b.pvbs = res + return res, pvcSummary, nil } +func (b *fakePodVolumeBackupper) WaitAllPodVolumesProcessed(log logrus.FieldLogger) []*velerov1.PodVolumeBackup { + return b.pvbs +} + // TestBackupWithPodVolume runs backups of pods that are annotated for PodVolume backup, // and ensures that the pod volume backupper is called, that the returned PodVolumeBackups // are added to the Request object, and that when PVCs are backed up with PodVolume, the @@ -3289,7 +3297,7 @@ func newHarness(t *testing.T) *harness { // unsupported podCommandExecutor: nil, - podVolumeBackupperFactory: nil, + podVolumeBackupperFactory: new(fakePodVolumeBackupperFactory), podVolumeTimeout: 0, }, log: log, diff --git a/pkg/backup/item_backupper.go b/pkg/backup/item_backupper.go index 7f1d826857..4b589fae31 100644 --- a/pkg/backup/item_backupper.go +++ b/pkg/backup/item_backupper.go @@ -270,7 +270,6 @@ func (ib *itemBackupper) backupItemInternal(logger logrus.FieldLogger, obj runti // even if there are errors. podVolumeBackups, podVolumePVCBackupSummary, errs := ib.backupPodVolumes(log, pod, pvbVolumes) - ib.backupRequest.PodVolumeBackups = append(ib.backupRequest.PodVolumeBackups, podVolumeBackups...) backupErrs = append(backupErrs, errs...) // Mark the volumes that has been processed by pod volume backup as Taken in the tracker. diff --git a/pkg/podvolume/backupper.go b/pkg/podvolume/backupper.go index c40a198d20..8d06cb2225 100644 --- a/pkg/podvolume/backupper.go +++ b/pkg/podvolume/backupper.go @@ -45,17 +45,19 @@ import ( type Backupper interface { // BackupPodVolumes backs up all specified volumes in a pod. BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api.Pod, volumesToBackup []string, resPolicies *resourcepolicies.Policies, log logrus.FieldLogger) ([]*velerov1api.PodVolumeBackup, *PVCBackupSummary, []error) + WaitAllPodVolumesProcessed(log logrus.FieldLogger) []*velerov1api.PodVolumeBackup } type backupper struct { - ctx context.Context - repoLocker *repository.RepoLocker - repoEnsurer *repository.Ensurer - crClient ctrlclient.Client - uploaderType string - - results map[string]chan *velerov1api.PodVolumeBackup - resultsLock sync.Mutex + ctx context.Context + repoLocker *repository.RepoLocker + repoEnsurer *repository.Ensurer + crClient ctrlclient.Client + uploaderType string + pvbInformer ctrlcache.Informer + handlerRegistration cache.ResourceEventHandlerRegistration + wg sync.WaitGroup + result []*velerov1api.PodVolumeBackup } type skippedPVC struct { @@ -105,7 +107,6 @@ func newBackupper( crClient ctrlclient.Client, uploaderType string, backup *velerov1api.Backup, - log logrus.FieldLogger, ) *backupper { b := &backupper{ ctx: ctx, @@ -113,11 +114,12 @@ func newBackupper( repoEnsurer: repoEnsurer, crClient: crClient, uploaderType: uploaderType, - - results: make(map[string]chan *velerov1api.PodVolumeBackup), + pvbInformer: pvbInformer, + wg: sync.WaitGroup{}, + result: []*velerov1api.PodVolumeBackup{}, } - _, _ = pvbInformer.AddEventHandler( + b.handlerRegistration, _ = pvbInformer.AddEventHandler( cache.ResourceEventHandlerFuncs{ UpdateFunc: func(_, obj interface{}) { pvb := obj.(*velerov1api.PodVolumeBackup) @@ -126,17 +128,13 @@ func newBackupper( return } - if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseCompleted || pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed { - b.resultsLock.Lock() - defer b.resultsLock.Unlock() - - resChan, ok := b.results[resultsKey(pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name)] - if !ok { - log.Errorf("No results channel found for pod %s/%s to send pod volume backup %s/%s on", pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name, pvb.Namespace, pvb.Name) - return - } - resChan <- pvb + if pvb.Status.Phase != velerov1api.PodVolumeBackupPhaseCompleted && + pvb.Status.Phase != velerov1api.PodVolumeBackupPhaseFailed { + return } + + b.result = append(b.result, pvb) + b.wg.Done() }, }, ) @@ -217,12 +215,6 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api. b.repoLocker.Lock(repo.Name) defer b.repoLocker.Unlock(repo.Name) - resultsChan := make(chan *velerov1api.PodVolumeBackup) - - b.resultsLock.Lock() - b.results[resultsKey(pod.Namespace, pod.Name)] = resultsChan - b.resultsLock.Unlock() - var ( podVolumeBackups []*velerov1api.PodVolumeBackup mountedPodVolumes = sets.Set[string]{} @@ -243,7 +235,6 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api. repoIdentifier = repo.Spec.ResticIdentifier } - var numVolumeSnapshots int for _, volumeName := range volumesToBackup { volume, ok := podVolumes[volumeName] if !ok { @@ -305,32 +296,40 @@ func (b *backupper) BackupPodVolumes(backup *velerov1api.Backup, pod *corev1api. errs = append(errs, err) continue } + b.wg.Add(1) + podVolumeBackups = append(podVolumeBackups, volumeBackup) pvcSummary.addBackedup(volumeName) - numVolumeSnapshots++ } -ForEachVolume: - for i, count := 0, numVolumeSnapshots; i < count; i++ { - select { - case <-b.ctx.Done(): - errs = append(errs, errors.New("timed out waiting for all PodVolumeBackups to complete")) - break ForEachVolume - case res := <-resultsChan: - switch res.Status.Phase { - case velerov1api.PodVolumeBackupPhaseCompleted: - podVolumeBackups = append(podVolumeBackups, res) - case velerov1api.PodVolumeBackupPhaseFailed: - errs = append(errs, errors.Errorf("pod volume backup failed: %s", res.Status.Message)) - podVolumeBackups = append(podVolumeBackups, res) + return podVolumeBackups, pvcSummary, errs +} + +func (b *backupper) WaitAllPodVolumesProcessed(log logrus.FieldLogger) []*velerov1api.PodVolumeBackup { + defer func() { + if err := b.pvbInformer.RemoveEventHandler(b.handlerRegistration); err != nil { + log.Debugf("failed to remove the event handler for PVB: %v", err) + } + }() + + done := make(chan struct{}) + go func() { + defer close(done) + b.wg.Wait() + }() + + var podVolumeBackups []*velerov1api.PodVolumeBackup + select { + case <-b.ctx.Done(): + log.Error("timed out waiting for all PodVolumeBackups to complete") + case <-done: + for _, pvb := range b.result { + podVolumeBackups = append(podVolumeBackups, pvb) + if pvb.Status.Phase == velerov1api.PodVolumeBackupPhaseFailed { + log.Errorf("pod volume backup failed: %s", pvb.Status.Message) } } } - - b.resultsLock.Lock() - delete(b.results, resultsKey(pod.Namespace, pod.Name)) - b.resultsLock.Unlock() - - return podVolumeBackups, pvcSummary, errs + return podVolumeBackups } func skipAllPodVolumes(pod *corev1api.Pod, volumesToBackup []string, err error, pvcSummary *PVCBackupSummary, log logrus.FieldLogger) { diff --git a/pkg/podvolume/backupper_factory.go b/pkg/podvolume/backupper_factory.go index 84020e6642..fb166f1105 100644 --- a/pkg/podvolume/backupper_factory.go +++ b/pkg/podvolume/backupper_factory.go @@ -60,7 +60,7 @@ type backupperFactory struct { } func (bf *backupperFactory) NewBackupper(ctx context.Context, backup *velerov1api.Backup, uploaderType string) (Backupper, error) { - b := newBackupper(ctx, bf.repoLocker, bf.repoEnsurer, bf.pvbInformer, bf.crClient, uploaderType, backup, bf.log) + b := newBackupper(ctx, bf.repoLocker, bf.repoEnsurer, bf.pvbInformer, bf.crClient, uploaderType, backup) if !cache.WaitForCacheSync(ctx.Done(), bf.pvbInformer.HasSynced) { return nil, errors.New("timed out waiting for caches to sync") diff --git a/pkg/podvolume/backupper_test.go b/pkg/podvolume/backupper_test.go index bf563bdd55..518c3fddd1 100644 --- a/pkg/podvolume/backupper_test.go +++ b/pkg/podvolume/backupper_test.go @@ -29,6 +29,7 @@ import ( corev1api "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" ctrlfake "sigs.k8s.io/controller-runtime/pkg/client/fake" clientTesting "k8s.io/client-go/testing" @@ -307,15 +308,8 @@ func TestBackupPodVolumes(t *testing.T) { velerov1api.AddToScheme(scheme) corev1api.AddToScheme(scheme) - ctxWithCancel, cancel := context.WithCancel(context.Background()) - defer cancel() - - failedPVB := createPVBObj(true, false, 1, "") - completedPVB := createPVBObj(false, false, 1, "") - tests := []struct { name string - ctx context.Context bsl string uploaderType string volumes []string @@ -325,7 +319,6 @@ func TestBackupPodVolumes(t *testing.T) { veleroClientObj []runtime.Object veleroReactors []reactor runtimeScheme *runtime.Scheme - retPVBs []*velerov1api.PodVolumeBackup pvbs []*velerov1api.PodVolumeBackup errs []string }{ @@ -493,87 +486,11 @@ func TestBackupPodVolumes(t *testing.T) { uploaderType: "kopia", bsl: "fake-bsl", }, - { - name: "context canceled", - ctx: ctxWithCancel, - volumes: []string{ - "fake-volume-1", - }, - sourcePod: createPodObj(true, true, true, 1), - kubeClientObj: []runtime.Object{ - createNodeAgentPodObj(true), - createPVCObj(1), - createPVObj(1, false), - }, - ctlClientObj: []runtime.Object{ - createBackupRepoObj(), - }, - runtimeScheme: scheme, - uploaderType: "kopia", - bsl: "fake-bsl", - errs: []string{ - "timed out waiting for all PodVolumeBackups to complete", - }, - }, - { - name: "return failed pvbs", - volumes: []string{ - "fake-volume-1", - }, - sourcePod: createPodObj(true, true, true, 1), - kubeClientObj: []runtime.Object{ - createNodeAgentPodObj(true), - createPVCObj(1), - createPVObj(1, false), - }, - ctlClientObj: []runtime.Object{ - createBackupRepoObj(), - }, - runtimeScheme: scheme, - uploaderType: "kopia", - bsl: "fake-bsl", - retPVBs: []*velerov1api.PodVolumeBackup{ - failedPVB, - }, - pvbs: []*velerov1api.PodVolumeBackup{ - failedPVB, - }, - errs: []string{ - "pod volume backup failed: fake-message", - }, - }, - { - name: "return completed pvbs", - volumes: []string{ - "fake-volume-1", - }, - sourcePod: createPodObj(true, true, true, 1), - kubeClientObj: []runtime.Object{ - createNodeAgentPodObj(true), - createPVCObj(1), - createPVObj(1, false), - }, - ctlClientObj: []runtime.Object{ - createBackupRepoObj(), - }, - runtimeScheme: scheme, - uploaderType: "kopia", - bsl: "fake-bsl", - retPVBs: []*velerov1api.PodVolumeBackup{ - completedPVB, - }, - pvbs: []*velerov1api.PodVolumeBackup{ - completedPVB, - }, - }, } // TODO add more verification around PVCBackupSummary returned by "BackupPodVolumes" for _, test := range tests { t.Run(test.name, func(t *testing.T) { ctx := context.Background() - if test.ctx != nil { - ctx = test.ctx - } fakeClientBuilder := ctrlfake.NewClientBuilder() if test.runtimeScheme != nil { @@ -606,18 +523,6 @@ func TestBackupPodVolumes(t *testing.T) { require.NoError(t, err) - go func() { - if test.ctx != nil { - time.Sleep(time.Second) - cancel() - } else if test.retPVBs != nil { - time.Sleep(time.Second) - for _, pvb := range test.retPVBs { - bp.(*backupper).results[resultsKey(test.sourcePod.Namespace, test.sourcePod.Name)] <- pvb - } - } - }() - pvbs, _, errs := bp.BackupPodVolumes(backupObj, test.sourcePod, test.volumes, nil, velerotest.NewLogger()) if errs == nil { @@ -633,6 +538,102 @@ func TestBackupPodVolumes(t *testing.T) { } } +type logHook struct { + entry *logrus.Entry +} + +func (l *logHook) Levels() []logrus.Level { + return []logrus.Level{logrus.ErrorLevel} +} +func (l *logHook) Fire(entry *logrus.Entry) error { + l.entry = entry + return nil +} + +func TestWaitAllPodVolumesProcessed(t *testing.T) { + timeoutCtx, _ := context.WithTimeout(context.Background(), 1*time.Second) + cases := []struct { + name string + ctx context.Context + statusToBeUpdated *velerov1api.PodVolumeBackupStatus + expectedErr string + expectedPVBPhase velerov1api.PodVolumeBackupPhase + }{ + { + name: "context cancelled", + ctx: timeoutCtx, + expectedErr: "timed out waiting for all PodVolumeBackups to complete", + }, + { + name: "failed pvbs", + ctx: context.Background(), + statusToBeUpdated: &velerov1api.PodVolumeBackupStatus{ + Phase: velerov1api.PodVolumeBackupPhaseFailed, + Message: "failed", + }, + expectedPVBPhase: velerov1api.PodVolumeBackupPhaseFailed, + expectedErr: "pod volume backup failed: failed", + }, + { + name: "completed pvbs", + ctx: context.Background(), + statusToBeUpdated: &velerov1api.PodVolumeBackupStatus{ + Phase: velerov1api.PodVolumeBackupPhaseCompleted, + Message: "completed", + }, + expectedPVBPhase: velerov1api.PodVolumeBackupPhaseCompleted, + }, + } + + for _, c := range cases { + newPVB := builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb").Result() + scheme := runtime.NewScheme() + velerov1api.AddToScheme(scheme) + client := ctrlfake.NewClientBuilder().WithScheme(scheme).WithObjects(newPVB).Build() + + lw := kube.InternalLW{ + Client: client, + Namespace: velerov1api.DefaultNamespace, + ObjectList: new(velerov1api.PodVolumeBackupList), + } + + informer := cache.NewSharedIndexInformer(&lw, &velerov1api.PodVolumeBackup{}, 0, cache.Indexers{}) + + ctx := context.Background() + go informer.Run(ctx.Done()) + require.True(t, cache.WaitForCacheSync(ctx.Done(), informer.HasSynced)) + + logger := logrus.New() + logHook := &logHook{} + logger.Hooks.Add(logHook) + + backuper := newBackupper(c.ctx, nil, nil, informer, nil, "", &velerov1api.Backup{}) + backuper.wg.Add(1) + + if c.statusToBeUpdated != nil { + pvb := &velerov1api.PodVolumeBackup{} + err := client.Get(context.Background(), ctrlclient.ObjectKey{Namespace: newPVB.Namespace, Name: newPVB.Name}, pvb) + require.Nil(t, err) + + pvb.Status = *c.statusToBeUpdated + err = client.Update(context.Background(), pvb) + require.Nil(t, err) + } + + pvbs := backuper.WaitAllPodVolumesProcessed(logger) + + if c.expectedErr != "" { + assert.Equal(t, c.expectedErr, logHook.entry.Message) + } + + if c.expectedPVBPhase != "" { + require.Len(t, pvbs, 1) + assert.Equal(t, c.expectedPVBPhase, pvbs[0].Status.Phase) + } + + } +} + func TestPVCBackupSummary(t *testing.T) { pbs := NewPVCBackupSummary() pbs.pvcMap["vol-1"] = builder.ForPersistentVolumeClaim("ns-1", "pvc-1").VolumeName("pv-1").Result()