diff --git a/go.mod b/go.mod index 61885fe44c..3a2e3bfae9 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/vmware-tanzu/velero -go 1.18 +go 1.20 require ( cloud.google.com/go/storage v1.30.1 diff --git a/pkg/builder/data_download_builder.go b/pkg/builder/data_download_builder.go index c564c80cff..9a85c79056 100644 --- a/pkg/builder/data_download_builder.go +++ b/pkg/builder/data_download_builder.go @@ -110,3 +110,9 @@ func (d *DataDownloadBuilder) ObjectMeta(opts ...ObjectMetaOpt) *DataDownloadBui return d } + +// StartTimestamp sets the DataDownload's StartTimestamp. +func (d *DataDownloadBuilder) StartTimestamp(startTime *metav1.Time) *DataDownloadBuilder { + d.object.Status.StartTimestamp = startTime + return d +} diff --git a/pkg/builder/data_upload_builder.go b/pkg/builder/data_upload_builder.go index a844ef6ef8..cb5d0b2de3 100644 --- a/pkg/builder/data_upload_builder.go +++ b/pkg/builder/data_upload_builder.go @@ -113,3 +113,9 @@ func (d *DataUploadBuilder) CSISnapshot(cSISnapshot *velerov2alpha1api.CSISnapsh d.object.Spec.CSISnapshot = cSISnapshot return d } + +// StartTimestamp sets the DataUpload's StartTimestamp. +func (d *DataUploadBuilder) StartTimestamp(startTime *metav1.Time) *DataUploadBuilder { + d.object.Status.StartTimestamp = startTime + return d +} diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index 3f635602ed..2762569453 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -71,20 +71,23 @@ const ( // files will be written to defaultCredentialsDirectory = "/tmp/credentials" - defaultResourceTimeout = 10 * time.Minute + defaultResourceTimeout = 10 * time.Minute + defaultDataMoverPrepareTimeout = 30 * time.Minute ) type nodeAgentServerConfig struct { - metricsAddress string - resourceTimeout time.Duration + metricsAddress string + resourceTimeout time.Duration + dataMoverPrepareTimeout time.Duration } func NewServerCommand(f client.Factory) *cobra.Command { logLevelFlag := logging.LogLevelFlag(logrus.InfoLevel) formatFlag := logging.NewFormatFlag() config := nodeAgentServerConfig{ - metricsAddress: defaultMetricsAddress, - resourceTimeout: defaultResourceTimeout, + metricsAddress: defaultMetricsAddress, + resourceTimeout: defaultResourceTimeout, + dataMoverPrepareTimeout: defaultDataMoverPrepareTimeout, } command := &cobra.Command{ @@ -110,6 +113,7 @@ func NewServerCommand(f client.Factory) *cobra.Command { command.Flags().Var(logLevelFlag, "log-level", fmt.Sprintf("The level at which to log. Valid values are %s.", strings.Join(logLevelFlag.AllowedValues(), ", "))) command.Flags().Var(formatFlag, "log-format", fmt.Sprintf("The format for log output. Valid values are %s.", strings.Join(formatFlag.AllowedValues(), ", "))) command.Flags().DurationVar(&config.resourceTimeout, "resource-timeout", config.resourceTimeout, "How long to wait for resource processes which are not covered by other specific timeout parameters. Default is 10 minutes.") + command.Flags().DurationVar(&config.dataMoverPrepareTimeout, "data-mover-prepare-timeout", config.dataMoverPrepareTimeout, "How long to wait for preparing a DataUpload/DataDownload. Default is 30 minutes.") return command } @@ -256,11 +260,11 @@ func (s *nodeAgentServer) run() { s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller") } - if err = controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.logger).SetupWithManager(s.mgr); err != nil { + if err = controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger).SetupWithManager(s.mgr); err != nil { s.logger.WithError(err).Fatal("Unable to create the data upload controller") } - if err = controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, repoEnsurer, credentialGetter, s.nodeName, s.logger).SetupWithManager(s.mgr); err != nil { + if err = controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger).SetupWithManager(s.mgr); err != nil { s.logger.WithError(err).Fatal("Unable to create the data download controller") } diff --git a/pkg/controller/data_download_controller.go b/pkg/controller/data_download_controller.go index 51be1ab9be..d7e3e3e266 100644 --- a/pkg/controller/data_download_controller.go +++ b/pkg/controller/data_download_controller.go @@ -62,10 +62,11 @@ type DataDownloadReconciler struct { nodeName string repositoryEnsurer *repository.Ensurer dataPathMgr *datapath.Manager + preparingTimeout time.Duration } func NewDataDownloadReconciler(client client.Client, kubeClient kubernetes.Interface, - repoEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, nodeName string, logger logrus.FieldLogger) *DataDownloadReconciler { + repoEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger) *DataDownloadReconciler { return &DataDownloadReconciler{ client: client, kubeClient: kubeClient, @@ -77,6 +78,7 @@ func NewDataDownloadReconciler(client client.Client, kubeClient kubernetes.Inter repositoryEnsurer: repoEnsurer, restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger), dataPathMgr: datapath.NewManager(1), + preparingTimeout: preparingTimeout, } } @@ -143,6 +145,14 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request } log.Info("Restore is exposed") + return ctrl.Result{}, nil + } else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted { + if dd.Status.StartTimestamp != nil { + if time.Since(dd.Status.StartTimestamp.Time) >= r.preparingTimeout { + r.onPrepareTimeout(ctx, dd) + } + } + return ctrl.Result{}, nil } else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared { log.Info("Data download is prepared") @@ -184,7 +194,6 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request // Update status to InProgress original := dd.DeepCopy() dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseInProgress - dd.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()} if err := r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil { log.WithError(err).Error("Unable to update status to in progress") return ctrl.Result{}, err @@ -345,8 +354,15 @@ func (r *DataDownloadReconciler) OnDataDownloadProgress(ctx context.Context, nam // re-enqueue the previous related request once the related pod is in running status to keep going on the rest logic. and below logic will avoid handling the unwanted // pod status and also avoid block others CR handling func (r *DataDownloadReconciler) SetupWithManager(mgr ctrl.Manager) error { + s := kube.NewPeriodicalEnqueueSource(r.logger, r.client, &velerov2alpha1api.DataDownloadList{}, preparingMonitorFrequency, kube.PeriodicalEnqueueSourceOption{}) + gp := kube.NewGenericEventPredicate(func(object client.Object) bool { + dd := object.(*velerov2alpha1api.DataDownload) + return (dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted) + }) + return ctrl.NewControllerManagedBy(mgr). For(&velerov2alpha1api.DataDownload{}). + Watches(s, nil, builder.WithPredicates(gp)). Watches(&source.Kind{Type: &v1.Pod{}}, kube.EnqueueRequestsFromMapUpdateFunc(r.findSnapshotRestoreForPod), builder.WithPredicates(predicate.Funcs{ UpdateFunc: func(ue event.UpdateEvent) bool { @@ -400,9 +416,15 @@ func (r *DataDownloadReconciler) findSnapshotRestoreForPod(podObj client.Object) requests := make([]reconcile.Request, 1) r.logger.WithField("Restore pod", pod.Name).Infof("Preparing data download %s", dd.Name) - err = r.patchDataDownload(context.Background(), dd, r.prepareDataDownload) - if err != nil { - r.logger.WithField("Restore pod", pod.Name).WithError(err).Error("unable to patch data download") + + // we don't expect anyone else update the CR during the Prepare process + updated, err := r.exclusiveUpdateDataDownload(context.Background(), dd, r.prepareDataDownload) + if err != nil || !updated { + r.logger.WithFields(logrus.Fields{ + "Datadownload": dd.Name, + "Restore pod": pod.Name, + "updated": updated, + }).WithError(err).Warn("failed to patch datadownload, prepare will halt for this datadownload") return []reconcile.Request{} } @@ -416,16 +438,6 @@ func (r *DataDownloadReconciler) findSnapshotRestoreForPod(podObj client.Object) return requests } -func (r *DataDownloadReconciler) patchDataDownload(ctx context.Context, req *velerov2alpha1api.DataDownload, mutate func(*velerov2alpha1api.DataDownload)) error { - original := req.DeepCopy() - mutate(req) - if err := r.client.Patch(ctx, req, client.MergeFrom(original)); err != nil { - return errors.Wrap(err, "error patching data download") - } - - return nil -} - func (r *DataDownloadReconciler) prepareDataDownload(ssb *velerov2alpha1api.DataDownload) { ssb.Status.Phase = velerov2alpha1api.DataDownloadPhasePrepared ssb.Status.Node = r.nodeName @@ -453,17 +465,62 @@ func (r *DataDownloadReconciler) updateStatusToFailed(ctx context.Context, dd *v } func (r *DataDownloadReconciler) acceptDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload) (bool, error) { - updated := dd.DeepCopy() - updated.Status.Phase = velerov2alpha1api.DataDownloadPhaseAccepted + r.logger.Infof("Accepting data download %s", dd.Name) - r.logger.Infof("Accepting snapshot restore %s", dd.Name) // For all data download controller in each node-agent will try to update download CR, and only one controller will success, // and the success one could handle later logic + succeeded, err := r.exclusiveUpdateDataDownload(ctx, dd, func(dd *velerov2alpha1api.DataDownload) { + dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseAccepted + dd.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()} + }) + + if err != nil { + return false, err + } + + if succeeded { + r.logger.WithField("DataDownload", dd.Name).Infof("This datadownload has been accepted by %s", r.nodeName) + return true, nil + } + + r.logger.WithField("DataDownload", dd.Name).Info("This datadownload has been accepted by others") + return false, nil +} + +func (r *DataDownloadReconciler) onPrepareTimeout(ctx context.Context, dd *velerov2alpha1api.DataDownload) { + log := r.logger.WithField("DataDownload", dd.Name) + + log.Info("Timeout happened for preparing datadownload") + + succeeded, err := r.exclusiveUpdateDataDownload(ctx, dd, func(dd *velerov2alpha1api.DataDownload) { + dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseFailed + dd.Status.Message = "timeout on preparing data download" + }) + + if err != nil { + log.WithError(err).Warn("Failed to update datadownload") + return + } + + if !succeeded { + log.Warn("Dataupload has been updated by others") + return + } + + r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd)) + + log.Info("Dataupload has been cleaned up") +} + +func (r *DataDownloadReconciler) exclusiveUpdateDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload, + updateFunc func(*velerov2alpha1api.DataDownload)) (bool, error) { + updated := dd.DeepCopy() + updateFunc(updated) + err := r.client.Update(ctx, updated) if err == nil { return true, nil } else if apierrors.IsConflict(err) { - r.logger.WithField("DataDownload", dd.Name).Error("This data download restore has been accepted by others") return false, nil } else { return false, err diff --git a/pkg/controller/data_download_controller_test.go b/pkg/controller/data_download_controller_test.go index 773112207a..8447a9eb36 100644 --- a/pkg/controller/data_download_controller_test.go +++ b/pkg/controller/data_download_controller_test.go @@ -29,6 +29,7 @@ import ( "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" clientgofake "k8s.io/client-go/kubernetes/fake" @@ -65,6 +66,29 @@ func dataDownloadBuilder() *builder.DataDownloadBuilder { } func initDataDownloadReconciler(objects []runtime.Object, needError ...bool) (*DataDownloadReconciler, error) { + var errs []error = make([]error, 4) + if len(needError) == 4 { + if needError[0] { + errs[0] = fmt.Errorf("Get error") + } + + if needError[1] { + errs[1] = fmt.Errorf("Create error") + } + + if needError[2] { + errs[2] = fmt.Errorf("Update error") + } + + if needError[3] { + errs[3] = fmt.Errorf("Patch error") + } + } + + return initDataDownloadReconcilerWithError(objects, errs...) +} + +func initDataDownloadReconcilerWithError(objects []runtime.Object, needError ...error) (*DataDownloadReconciler, error) { scheme := runtime.NewScheme() err := velerov1api.AddToScheme(scheme) if err != nil { @@ -112,7 +136,7 @@ func initDataDownloadReconciler(objects []runtime.Object, needError ...bool) (*D if err != nil { return nil, err } - return NewDataDownloadReconciler(fakeClient, fakeKubeClient, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", velerotest.NewLogger()), nil + return NewDataDownloadReconciler(fakeClient, fakeKubeClient, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", time.Minute*5, velerotest.NewLogger()), nil } func TestDataDownloadReconcile(t *testing.T) { @@ -132,6 +156,7 @@ func TestDataDownloadReconcile(t *testing.T) { notMockCleanUp bool mockCancel bool mockClose bool + expected *velerov2alpha1api.DataDownload expectedStatusMsg string expectedResult *ctrl.Result }{ @@ -215,7 +240,7 @@ func TestDataDownloadReconcile(t *testing.T) { dd: builder.ForDataDownload("test-ns", dataDownloadName).Result(), targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), needErrs: []bool{true, false, false, false}, - expectedStatusMsg: "Create error", + expectedStatusMsg: "Get error", }, { name: "Unsupported dataDownload type", @@ -246,6 +271,11 @@ func TestDataDownloadReconcile(t *testing.T) { expectedStatusMsg: "Error to expose restore exposer", isExposeErr: true, }, + { + name: "prepare timeout", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseAccepted).StartTimestamp(&metav1.Time{Time: time.Now().Add(-time.Minute * 5)}).Result(), + expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseFailed).Result(), + }, } for _, test := range tests { @@ -345,6 +375,11 @@ func TestDataDownloadReconcile(t *testing.T) { Namespace: test.dd.Namespace, }, &dd) + if test.expected != nil { + require.NoError(t, err) + assert.Equal(t, dd.Status.Phase, test.expected.Status.Phase) + } + if test.isGetExposeErr { assert.Contains(t, dd.Status.Message, test.expectedStatusMsg) } @@ -580,3 +615,93 @@ func TestFindDataDownloadForPod(t *testing.T) { } } } + +func TestAcceptDataDownload(t *testing.T) { + tests := []struct { + name string + dd *velerov2alpha1api.DataDownload + needErrs []error + succeeded bool + expectedErr string + }{ + { + name: "update fail", + dd: dataDownloadBuilder().Result(), + needErrs: []error{nil, nil, fmt.Errorf("fake-update-error"), nil}, + expectedErr: "fake-update-error", + }, + { + name: "accepted by others", + dd: dataDownloadBuilder().Result(), + needErrs: []error{nil, nil, &fakeAPIStatus{metav1.StatusReasonConflict}, nil}, + }, + { + name: "succeed", + dd: dataDownloadBuilder().Result(), + needErrs: []error{nil, nil, nil, nil}, + succeeded: true, + }, + } + for _, test := range tests { + ctx := context.Background() + r, err := initDataDownloadReconcilerWithError(nil, test.needErrs...) + require.NoError(t, err) + + err = r.client.Create(ctx, test.dd) + require.NoError(t, err) + + succeeded, err := r.acceptDataDownload(ctx, test.dd) + assert.Equal(t, test.succeeded, succeeded) + if test.expectedErr == "" { + assert.NoError(t, err) + } else { + assert.EqualError(t, err, test.expectedErr) + } + } +} + +func TestOnDdPrepareTimeout(t *testing.T) { + tests := []struct { + name string + dd *velerov2alpha1api.DataDownload + needErrs []error + expected *velerov2alpha1api.DataDownload + }{ + { + name: "update fail", + dd: dataDownloadBuilder().Result(), + needErrs: []error{nil, nil, fmt.Errorf("fake-update-error"), nil}, + expected: dataDownloadBuilder().Result(), + }, + { + name: "update interrupted", + dd: dataDownloadBuilder().Result(), + needErrs: []error{nil, nil, &fakeAPIStatus{metav1.StatusReasonConflict}, nil}, + expected: dataDownloadBuilder().Result(), + }, + { + name: "succeed", + dd: dataDownloadBuilder().Result(), + needErrs: []error{nil, nil, nil, nil}, + expected: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhaseFailed).Result(), + }, + } + for _, test := range tests { + ctx := context.Background() + r, err := initDataDownloadReconcilerWithError(nil, test.needErrs...) + require.NoError(t, err) + + err = r.client.Create(ctx, test.dd) + require.NoError(t, err) + + r.onPrepareTimeout(ctx, test.dd) + + dd := velerov2alpha1api.DataDownload{} + _ = r.client.Get(ctx, kbclient.ObjectKey{ + Name: test.dd.Name, + Namespace: test.dd.Namespace, + }, &dd) + + assert.Equal(t, test.expected.Status.Phase, dd.Status.Phase) + } +} diff --git a/pkg/controller/data_upload_controller.go b/pkg/controller/data_upload_controller.go index 101c083704..49ca428e1a 100644 --- a/pkg/controller/data_upload_controller.go +++ b/pkg/controller/data_upload_controller.go @@ -55,6 +55,8 @@ import ( const dataMoverType string = "velero" const dataUploadDownloadRequestor string = "snapshot-data-upload-download" +const preparingMonitorFrequency time.Duration = time.Minute + // DataUploadReconciler reconciles a DataUpload object type DataUploadReconciler struct { client client.Client @@ -68,11 +70,12 @@ type DataUploadReconciler struct { logger logrus.FieldLogger snapshotExposerList map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer dataPathMgr *datapath.Manager + preparingTimeout time.Duration } func NewDataUploadReconciler(client client.Client, kubeClient kubernetes.Interface, csiSnapshotClient snapshotter.SnapshotV1Interface, repoEnsurer *repository.Ensurer, clock clocks.WithTickerAndDelayedExecution, - cred *credentials.CredentialGetter, nodeName string, fs filesystem.Interface, log logrus.FieldLogger) *DataUploadReconciler { + cred *credentials.CredentialGetter, nodeName string, fs filesystem.Interface, preparingTimeout time.Duration, log logrus.FieldLogger) *DataUploadReconciler { return &DataUploadReconciler{ client: client, kubeClient: kubeClient, @@ -85,6 +88,7 @@ func NewDataUploadReconciler(client client.Client, kubeClient kubernetes.Interfa repoEnsurer: repoEnsurer, snapshotExposerList: map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{velerov2alpha1api.SnapshotTypeCSI: exposer.NewCSISnapshotExposer(kubeClient, csiSnapshotClient, log)}, dataPathMgr: datapath.NewManager(1), + preparingTimeout: preparingTimeout, } } @@ -143,6 +147,14 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) // ep.Expose() will trigger to create one pod whose volume is restored by a given volume snapshot, // but the pod maybe is not in the same node of the current controller, so we need to return it here. // And then only the controller who is in the same node could do the rest work. + return ctrl.Result{}, nil + } else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted { + if du.Status.StartTimestamp != nil { + if time.Since(du.Status.StartTimestamp.Time) >= r.preparingTimeout { + r.onPrepareTimeout(ctx, &du) + } + } + return ctrl.Result{}, nil } else if du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared { log.Info("Data upload is prepared") @@ -183,7 +195,6 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) // Update status to InProgress original := du.DeepCopy() du.Status.Phase = velerov2alpha1api.DataUploadPhaseInProgress - du.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()} if err := r.client.Patch(ctx, &du, client.MergeFrom(original)); err != nil { return r.errorOut(ctx, &du, err, "error updating dataupload status", log) } @@ -363,8 +374,15 @@ func (r *DataUploadReconciler) OnDataUploadProgress(ctx context.Context, namespa // re-enqueue the previous related request once the related pod is in running status to keep going on the rest logic. and below logic will avoid handling the unwanted // pod status and also avoid block others CR handling func (r *DataUploadReconciler) SetupWithManager(mgr ctrl.Manager) error { + s := kube.NewPeriodicalEnqueueSource(r.logger, r.client, &velerov2alpha1api.DataUploadList{}, preparingMonitorFrequency, kube.PeriodicalEnqueueSourceOption{}) + gp := kube.NewGenericEventPredicate(func(object client.Object) bool { + du := object.(*velerov2alpha1api.DataUpload) + return (du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted) + }) + return ctrl.NewControllerManagedBy(mgr). For(&velerov2alpha1api.DataUpload{}). + Watches(s, nil, builder.WithPredicates(gp)). Watches(&source.Kind{Type: &corev1.Pod{}}, kube.EnqueueRequestsFromMapUpdateFunc(r.findDataUploadForPod), builder.WithPredicates(predicate.Funcs{ UpdateFunc: func(ue event.UpdateEvent) bool { @@ -416,8 +434,15 @@ func (r *DataUploadReconciler) findDataUploadForPod(podObj client.Object) []reco } r.logger.WithField("Backup pod", pod.Name).Infof("Preparing dataupload %s", du.Name) - if err := r.patchDataUpload(context.Background(), du, r.prepareDataUpload); err != nil { - r.logger.WithField("Backup pod", pod.Name).WithError(err).Error("failed to patch dataupload") + + // we don't expect anyone else update the CR during the Prepare process + updated, err := r.exclusiveUpdateDataUpload(context.Background(), du, r.prepareDataUpload) + if err != nil || !updated { + r.logger.WithFields(logrus.Fields{ + "Dataupload": du.Name, + "Backup pod": pod.Name, + "updated": updated, + }).WithError(err).Warn("failed to patch dataupload, prepare will halt for this dataupload") return []reconcile.Request{} } @@ -430,16 +455,6 @@ func (r *DataUploadReconciler) findDataUploadForPod(podObj client.Object) []reco return []reconcile.Request{requests} } -func (r *DataUploadReconciler) patchDataUpload(ctx context.Context, req *velerov2alpha1api.DataUpload, mutate func(*velerov2alpha1api.DataUpload)) error { - original := req.DeepCopy() - mutate(req) - if err := r.client.Patch(ctx, req, client.MergeFrom(original)); err != nil { - return errors.Wrap(err, "error patching DataUpload") - } - - return nil -} - func (r *DataUploadReconciler) prepareDataUpload(du *velerov2alpha1api.DataUpload) { du.Status.Phase = velerov2alpha1api.DataUploadPhasePrepared du.Status.Node = r.nodeName @@ -475,19 +490,73 @@ func (r *DataUploadReconciler) updateStatusToFailed(ctx context.Context, du *vel } func (r *DataUploadReconciler) acceptDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload) (bool, error) { - updated := du.DeepCopy() - updated.Status.Phase = velerov2alpha1api.DataUploadPhaseAccepted - - r.logger.Infof("Accepting snapshot backup %s", du.Name) + r.logger.Infof("Accepting data upload %s", du.Name) // For all data upload controller in each node-agent will try to update dataupload CR, and only one controller will success, // and the success one could handle later logic + succeeded, err := r.exclusiveUpdateDataUpload(ctx, du, func(du *velerov2alpha1api.DataUpload) { + du.Status.Phase = velerov2alpha1api.DataUploadPhaseAccepted + du.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()} + }) + + if err != nil { + return false, err + } + + if succeeded { + r.logger.WithField("Dataupload", du.Name).Infof("This datauplod has been accepted by %s", r.nodeName) + return true, nil + } + + r.logger.WithField("Dataupload", du.Name).Info("This datauplod has been accepted by others") + return false, nil +} + +func (r *DataUploadReconciler) onPrepareTimeout(ctx context.Context, du *velerov2alpha1api.DataUpload) { + log := r.logger.WithField("Dataupload", du.Name) + + log.Info("Timeout happened for preparing dataupload") + + succeeded, err := r.exclusiveUpdateDataUpload(ctx, du, func(du *velerov2alpha1api.DataUpload) { + du.Status.Phase = velerov2alpha1api.DataUploadPhaseFailed + du.Status.Message = "timeout on preparing data upload" + }) + + if err != nil { + log.WithError(err).Warn("Failed to update dataupload") + return + } + + if !succeeded { + log.Warn("Dataupload has been updated by others") + return + } + + ep, ok := r.snapshotExposerList[du.Spec.SnapshotType] + if !ok { + log.WithError(fmt.Errorf("%v type of snapshot exposer is not exist", du.Spec.SnapshotType)). + Warn("Failed to clean up resources on canceled") + } else { + var volumeSnapshotName string + if du.Spec.SnapshotType == velerov2alpha1api.SnapshotTypeCSI { // Other exposer should have another condition + volumeSnapshotName = du.Spec.CSISnapshot.VolumeSnapshot + } + + ep.CleanUp(ctx, getOwnerObject(du), volumeSnapshotName, du.Spec.SourceNamespace) + + log.Info("Dataupload has been cleaned up") + } +} + +func (r *DataUploadReconciler) exclusiveUpdateDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload, + updateFunc func(*velerov2alpha1api.DataUpload)) (bool, error) { + updated := du.DeepCopy() + updateFunc(updated) + err := r.client.Update(ctx, updated) if err == nil { - r.logger.WithField("Dataupload", du.Name).Infof("This datauplod backup has been accepted by %s", r.nodeName) return true, nil } else if apierrors.IsConflict(err) { - r.logger.WithField("Dataupload", du.Name).Info("This datauplod backup has been accepted by others") return false, nil } else { return false, err diff --git a/pkg/controller/data_upload_controller_test.go b/pkg/controller/data_upload_controller_test.go index 654e07531f..e7a3b476f9 100644 --- a/pkg/controller/data_upload_controller_test.go +++ b/pkg/controller/data_upload_controller_test.go @@ -58,45 +58,68 @@ const fakeSnapshotType velerov2alpha1api.SnapshotType = "fake-snapshot" type FakeClient struct { kbclient.Client - getError bool - createError bool - updateError bool - patchError bool + getError error + createError error + updateError error + patchError error } func (c *FakeClient) Get(ctx context.Context, key kbclient.ObjectKey, obj kbclient.Object) error { - if c.getError { - return fmt.Errorf("Create error") + if c.getError != nil { + return c.getError } return c.Client.Get(ctx, key, obj) } func (c *FakeClient) Create(ctx context.Context, obj kbclient.Object, opts ...kbclient.CreateOption) error { - if c.createError { - return fmt.Errorf("Create error") + if c.createError != nil { + return c.createError } return c.Client.Create(ctx, obj, opts...) } func (c *FakeClient) Update(ctx context.Context, obj kbclient.Object, opts ...kbclient.UpdateOption) error { - if c.updateError { - return fmt.Errorf("Update error") + if c.updateError != nil { + return c.updateError } return c.Client.Update(ctx, obj, opts...) } func (c *FakeClient) Patch(ctx context.Context, obj kbclient.Object, patch kbclient.Patch, opts ...kbclient.PatchOption) error { - if c.patchError { - return fmt.Errorf("Patch error") + if c.patchError != nil { + return c.patchError } return c.Client.Patch(ctx, obj, patch, opts...) } func initDataUploaderReconciler(needError ...bool) (*DataUploadReconciler, error) { + var errs []error = make([]error, 4) + if len(needError) == 4 { + if needError[0] { + errs[0] = fmt.Errorf("Get error") + } + + if needError[1] { + errs[1] = fmt.Errorf("Create error") + } + + if needError[2] { + errs[2] = fmt.Errorf("Update error") + } + + if needError[3] { + errs[3] = fmt.Errorf("Patch error") + } + } + + return initDataUploaderReconcilerWithError(errs...) +} + +func initDataUploaderReconcilerWithError(needError ...error) (*DataUploadReconciler, error) { vscName := "fake-vsc" vsObject := &snapshotv1api.VolumeSnapshot{ ObjectMeta: metav1.ObjectMeta{ @@ -170,7 +193,7 @@ func initDataUploaderReconciler(needError ...bool) (*DataUploadReconciler, error return nil, err } return NewDataUploadReconciler(fakeClient, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), nil, - testclocks.NewFakeClock(now), &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", fakeFS, velerotest.NewLogger()), nil + testclocks.NewFakeClock(now), &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", fakeFS, time.Minute*5, velerotest.NewLogger()), nil } func dataUploadBuilder() *builder.DataUploadBuilder { @@ -277,7 +300,7 @@ func TestReconcile(t *testing.T) { expectedProcessed: false, expected: nil, expectedRequeue: ctrl.Result{}, - expectedErrMsg: "getting DataUpload: Create error", + expectedErrMsg: "getting DataUpload: Get error", needErrs: []bool{true, false, false, false}, }, { name: "Unsupported data mover type", @@ -339,6 +362,11 @@ func TestReconcile(t *testing.T) { expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(), expectedRequeue: ctrl.Result{Requeue: true, RequeueAfter: time.Minute}, }, + { + name: "prepare timeout", + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).SnapshotType(fakeSnapshotType).StartTimestamp(&metav1.Time{Time: time.Now().Add(-time.Minute * 5)}).Result(), + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(), + }, } for _, test := range tests { @@ -599,3 +627,107 @@ func TestFindDataUploadForPod(t *testing.T) { } } } + +type fakeAPIStatus struct { + reason metav1.StatusReason +} + +func (f *fakeAPIStatus) Status() metav1.Status { + return metav1.Status{ + Reason: f.reason, + } +} + +func (f *fakeAPIStatus) Error() string { + return string(f.reason) +} + +func TestAcceptDataUpload(t *testing.T) { + tests := []struct { + name string + du *velerov2alpha1api.DataUpload + needErrs []error + succeeded bool + expectedErr string + }{ + { + name: "update fail", + du: dataUploadBuilder().Result(), + needErrs: []error{nil, nil, fmt.Errorf("fake-update-error"), nil}, + expectedErr: "fake-update-error", + }, + { + name: "accepted by others", + du: dataUploadBuilder().Result(), + needErrs: []error{nil, nil, &fakeAPIStatus{metav1.StatusReasonConflict}, nil}, + }, + { + name: "succeed", + du: dataUploadBuilder().Result(), + needErrs: []error{nil, nil, nil, nil}, + succeeded: true, + }, + } + for _, test := range tests { + ctx := context.Background() + r, err := initDataUploaderReconcilerWithError(test.needErrs...) + require.NoError(t, err) + + err = r.client.Create(ctx, test.du) + require.NoError(t, err) + + succeeded, err := r.acceptDataUpload(ctx, test.du) + assert.Equal(t, test.succeeded, succeeded) + if test.expectedErr == "" { + assert.NoError(t, err) + } else { + assert.EqualError(t, err, test.expectedErr) + } + } +} + +func TestOnDuPrepareTimeout(t *testing.T) { + tests := []struct { + name string + du *velerov2alpha1api.DataUpload + needErrs []error + expected *velerov2alpha1api.DataUpload + }{ + { + name: "update fail", + du: dataUploadBuilder().Result(), + needErrs: []error{nil, nil, fmt.Errorf("fake-update-error"), nil}, + expected: dataUploadBuilder().Result(), + }, + { + name: "update interrupted", + du: dataUploadBuilder().Result(), + needErrs: []error{nil, nil, &fakeAPIStatus{metav1.StatusReasonConflict}, nil}, + expected: dataUploadBuilder().Result(), + }, + { + name: "succeed", + du: dataUploadBuilder().Result(), + needErrs: []error{nil, nil, nil, nil}, + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).Result(), + }, + } + for _, test := range tests { + ctx := context.Background() + r, err := initDataUploaderReconcilerWithError(test.needErrs...) + require.NoError(t, err) + + err = r.client.Create(ctx, test.du) + require.NoError(t, err) + + r.onPrepareTimeout(ctx, test.du) + + du := velerov2alpha1api.DataUpload{} + _ = r.client.Get(ctx, kbclient.ObjectKey{ + Name: test.du.Name, + Namespace: test.du.Namespace, + }, &du) + + assert.Equal(t, test.expected.Status.Phase, du.Status.Phase) + } +}