Skip to content

Commit

Permalink
Merge pull request #6616 from qiuming-best/add-accept-label
Browse files Browse the repository at this point in the history
Fix data mover controller bugs
  • Loading branch information
qiuming-best authored Aug 15, 2023
2 parents 713792d + 5485616 commit 411bd54
Show file tree
Hide file tree
Showing 7 changed files with 842 additions and 115 deletions.
6 changes: 6 additions & 0 deletions pkg/builder/data_upload_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,9 @@ func (d *DataUploadBuilder) StartTimestamp(startTime *metav1.Time) *DataUploadBu
d.object.Status.StartTimestamp = startTime
return d
}

// Labels sets the DataUpload's Labels.
func (d *DataUploadBuilder) Labels(labels map[string]string) *DataUploadBuilder {
d.object.Labels = labels
return d
}
24 changes: 15 additions & 9 deletions pkg/cmd/cli/nodeagent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,11 +368,14 @@ func (s *nodeAgentServer) markDataUploadsCancel(r *controller.DataUploadReconcil
if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted ||
du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared ||
du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress {
updated := du.DeepCopy()
updated.Spec.Cancel = true
updated.Status.Message = fmt.Sprintf("found a dataupload with status %q during the node-agent starting, mark it as cancel", du.Status.Phase)
if err := client.Patch(s.ctx, updated, ctrlclient.MergeFrom(&du)); err != nil {
s.logger.WithError(errors.WithStack(err)).Errorf("failed to mark datadownload %q cancel", du.GetName())
err = controller.UpdateDataUploadWithRetry(s.ctx, client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, s.logger.WithField("dataupload", du.Name),
func(dataUpload *velerov2alpha1api.DataUpload) {
dataUpload.Spec.Cancel = true
dataUpload.Status.Message = fmt.Sprintf("found a dataupload with status %q during the node-agent starting, mark it as cancel", du.Status.Phase)
})

if err != nil {
s.logger.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q cancel", du.GetName())
continue
}
s.logger.WithField("dataupload", du.GetName()).Warn(du.Status.Message)
Expand All @@ -396,10 +399,13 @@ func (s *nodeAgentServer) markDataDownloadsCancel(r *controller.DataDownloadReco
if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted ||
dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared ||
dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
updated := dd.DeepCopy()
updated.Spec.Cancel = true
updated.Status.Message = fmt.Sprintf("found a datadownload with status %q during the node-agent starting, mark it as cancel", dd.Status.Phase)
if err := client.Patch(s.ctx, updated, ctrlclient.MergeFrom(dd)); err != nil {
err = controller.UpdateDataDownloadWithRetry(s.ctx, client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, s.logger.WithField("datadownload", dd.Name),
func(dataDownload *velerov2alpha1api.DataDownload) {
dataDownload.Spec.Cancel = true
dataDownload.Status.Message = fmt.Sprintf("found a datadownload with status %q during the node-agent starting, mark it as cancel", dd.Status.Phase)
})

if err != nil {
s.logger.WithError(errors.WithStack(err)).Errorf("failed to mark datadownload %q cancel", dd.GetName())
continue
}
Expand Down
23 changes: 15 additions & 8 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
kubeerrs "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -1107,10 +1108,13 @@ func markDataUploadsCancel(ctx context.Context, client ctrlclient.Client, backup
if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted ||
du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared ||
du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress {
updated := du.DeepCopy()
updated.Spec.Cancel = true
updated.Status.Message = fmt.Sprintf("found a dataupload with status %q during the velero server starting, mark it as cancel", du.Status.Phase)
if err := client.Patch(ctx, updated, ctrlclient.MergeFrom(&du)); err != nil {
err := controller.UpdateDataUploadWithRetry(ctx, client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, log.WithField("dataupload", du.Name),
func(dataUpload *velerov2alpha1api.DataUpload) {
dataUpload.Spec.Cancel = true
dataUpload.Status.Message = fmt.Sprintf("found a dataupload with status %q during the velero server starting, mark it as cancel", du.Status.Phase)
})

if err != nil {
log.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q cancel", du.GetName())
continue
}
Expand All @@ -1132,10 +1136,13 @@ func markDataDownloadsCancel(ctx context.Context, client ctrlclient.Client, rest
if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted ||
dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared ||
dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
updated := dd.DeepCopy()
updated.Spec.Cancel = true
updated.Status.Message = fmt.Sprintf("found a datadownload with status %q during the velero server starting, mark it as cancel", dd.Status.Phase)
if err := client.Patch(ctx, updated, ctrlclient.MergeFrom(&dd)); err != nil {
err := controller.UpdateDataDownloadWithRetry(ctx, client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, log.WithField("datadownload", dd.Name),
func(dataDownload *velerov2alpha1api.DataDownload) {
dataDownload.Spec.Cancel = true
dataDownload.Status.Message = fmt.Sprintf("found a datadownload with status %q during the velero server starting, mark it as cancel", dd.Status.Phase)
})

if err != nil {
log.WithError(errors.WithStack(err)).Errorf("failed to mark dataupload %q cancel", dd.GetName())
continue
}
Expand Down
157 changes: 145 additions & 12 deletions pkg/controller/data_download_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/utils/clock"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -118,6 +120,33 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
return r.errorOut(ctx, dd, errors.New("uninitialized generic exposer"), "uninitialized exposer", log)
}

// Add finalizer
// Logic for clear resources when datadownload been deleted
if dd.DeletionTimestamp.IsZero() { // add finalizer for all cr at beginning
if !isDataDownloadInFinalState(dd) && !controllerutil.ContainsFinalizer(dd, dataUploadDownloadFinalizer) {
succeeded, err := r.exclusiveUpdateDataDownload(ctx, dd, func(dd *velerov2alpha1api.DataDownload) {
controllerutil.AddFinalizer(dd, dataUploadDownloadFinalizer)
})
if err != nil {
log.Errorf("failed to add finalizer with error %s for %s/%s", err.Error(), dd.Namespace, dd.Name)
return ctrl.Result{}, err
} else if !succeeded {
log.Warnf("failed to add finilizer for %s/%s and will requeue later", dd.Namespace, dd.Name)
return ctrl.Result{Requeue: true}, nil
}
}
} else if controllerutil.ContainsFinalizer(dd, dataUploadDownloadFinalizer) && !dd.Spec.Cancel && !isDataDownloadInFinalState(dd) {
// when delete cr we need to clear up internal resources created by Velero, here we use the cancel mechanism
// to help clear up resources instead of clear them directly in case of some conflict with Expose action
if err := UpdateDataDownloadWithRetry(ctx, r.client, req.NamespacedName, log, func(dataDownload *velerov2alpha1api.DataDownload) {
dataDownload.Spec.Cancel = true
dataDownload.Status.Message = fmt.Sprintf("found a dataupload %s/%s is being deleted, mark it as cancel", dd.Namespace, dd.Name)
}); err != nil {
log.Errorf("failed to set cancel flag with error %s for %s/%s", err.Error(), dd.Namespace, dd.Name)
return ctrl.Result{}, err
}
}

if dd.Status.Phase == "" || dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseNew {
log.Info("Data download starting")

Expand Down Expand Up @@ -150,15 +179,44 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
// And then only the controller who is in the same node could do the rest work.
err = r.restoreExposer.Expose(ctx, getDataDownloadOwnerObject(dd), dd.Spec.TargetVolume.PVC, dd.Spec.TargetVolume.Namespace, hostingPodLabels, dd.Spec.OperationTimeout.Duration)
if err != nil {
return r.errorOut(ctx, dd, err, "error to start restore expose", log)
if err := r.client.Get(ctx, req.NamespacedName, dd); err != nil {
if !apierrors.IsNotFound(err) {
return ctrl.Result{}, errors.Wrap(err, "getting DataUpload")
}
}
if isDataDownloadInFinalState(dd) {
log.Warnf("expose snapshot with err %v but it may caused by clean up resources in cancel action", err)
r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd))
return ctrl.Result{}, nil
} else {
return r.errorOut(ctx, dd, err, "error to expose snapshot", log)
}
}

log.Info("Restore is exposed")

// we need to get CR again for it may canceled by datadownload controller on other
// nodes when doing expose action, if detectd cancel action we need to clear up the internal
// resources created by velero during backup.
if err := r.client.Get(ctx, req.NamespacedName, dd); err != nil {
if apierrors.IsNotFound(err) {
log.Debug("Unable to find datadownload")
return ctrl.Result{}, nil
}
return ctrl.Result{}, errors.Wrap(err, "getting datadownload")
}

// we need to clean up resources as resources created in Expose it may later than cancel action or prepare time
// and need to clean up resources again
if isDataDownloadInFinalState(dd) {
r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd))
}

return ctrl.Result{}, nil
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted {
if dd.Spec.Cancel {
log.Debugf("Data download is been canceled %s in Phase %s", dd.GetName(), dd.Status.Phase)
r.OnDataDownloadCancelled(ctx, dd.GetNamespace(), dd.GetName())
r.TryCancelDataDownload(ctx, dd)
} else if dd.Status.StartTimestamp != nil {
if time.Since(dd.Status.StartTimestamp.Time) >= r.preparingTimeout {
r.onPrepareTimeout(ctx, dd)
Expand Down Expand Up @@ -249,7 +307,15 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request

return ctrl.Result{}, nil
} else {
log.Debugf("Data download now is in %s phase and do nothing by current %s controller", dd.Status.Phase, r.nodeName)
// put the finilizer remove action here for all cr will goes to the final status, we could check finalizer and do remove action in final status
// instead of intermediate state
if isDataDownloadInFinalState(dd) && !dd.DeletionTimestamp.IsZero() && controllerutil.ContainsFinalizer(dd, dataUploadDownloadFinalizer) {
original := dd.DeepCopy()
controllerutil.RemoveFinalizer(dd, dataUploadDownloadFinalizer)
if err := r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("error to remove finalizer")
}
}
return ctrl.Result{}, nil
}
}
Expand Down Expand Up @@ -353,6 +419,32 @@ func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, na
}
}

func (r *DataDownloadReconciler) TryCancelDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload) {
log := r.logger.WithField("datadownload", dd.Name)
log.Warn("Async fs backup data path canceled")

succeeded, err := r.exclusiveUpdateDataDownload(ctx, dd, func(dataDownload *velerov2alpha1api.DataDownload) {
dataDownload.Status.Phase = velerov2alpha1api.DataDownloadPhaseCanceled
if dataDownload.Status.StartTimestamp.IsZero() {
dataDownload.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
}
dataDownload.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
})

if err != nil {
log.WithError(err).Error("error updating datadownload status")
return
} else if !succeeded {
log.Warn("conflict in updating datadownload status and will try it again later")
return
}

// success update
r.metrics.RegisterDataDownloadCancel(r.nodeName)
r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd))
r.closeDataPath(ctx, dd.Name)
}

func (r *DataDownloadReconciler) OnDataDownloadProgress(ctx context.Context, namespace string, ddName string, progress *uploader.Progress) {
log := r.logger.WithField("datadownload", ddName)

Expand Down Expand Up @@ -515,16 +607,28 @@ func (r *DataDownloadReconciler) acceptDataDownload(ctx context.Context, dd *vel

// For all data download controller in each node-agent will try to update download CR, and only one controller will success,
// and the success one could handle later logic
succeeded, err := r.exclusiveUpdateDataDownload(ctx, dd, func(dd *velerov2alpha1api.DataDownload) {
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseAccepted
dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
})

updated := dd.DeepCopy()

updateFunc := func(datadownload *velerov2alpha1api.DataDownload) {
datadownload.Status.Phase = velerov2alpha1api.DataDownloadPhaseAccepted
datadownload.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
labels := datadownload.GetLabels()
if labels == nil {
labels = make(map[string]string)
}
labels[acceptNodeLabelKey] = r.nodeName
datadownload.SetLabels(labels)
}

succeeded, err := r.exclusiveUpdateDataDownload(ctx, updated, updateFunc)

if err != nil {
return false, err
}

if succeeded {
updateFunc(dd) // If update success, it's need to update du values in memory
r.logger.WithField("DataDownload", dd.Name).Infof("This datadownload has been accepted by %s", r.nodeName)
return true, nil
}
Expand All @@ -537,7 +641,6 @@ func (r *DataDownloadReconciler) onPrepareTimeout(ctx context.Context, dd *veler
log := r.logger.WithField("DataDownload", dd.Name)

log.Info("Timeout happened for preparing datadownload")

succeeded, err := r.exclusiveUpdateDataDownload(ctx, dd, func(dd *velerov2alpha1api.DataDownload) {
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseFailed
dd.Status.Message = "timeout on preparing data download"
Expand All @@ -562,13 +665,15 @@ func (r *DataDownloadReconciler) onPrepareTimeout(ctx context.Context, dd *veler

func (r *DataDownloadReconciler) exclusiveUpdateDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload,
updateFunc func(*velerov2alpha1api.DataDownload)) (bool, error) {
updated := dd.DeepCopy()
updateFunc(updated)
updateFunc(dd)

err := r.client.Update(ctx, dd)

err := r.client.Update(ctx, updated)
if err == nil {
return true, nil
} else if apierrors.IsConflict(err) {
}
// it won't rollback dd in memory when error
if apierrors.IsConflict(err) {
return false, nil
} else {
return false, err
Expand Down Expand Up @@ -614,3 +719,31 @@ func findDataDownloadByPod(client client.Client, pod v1.Pod) (*velerov2alpha1api

return nil, nil
}

func isDataDownloadInFinalState(dd *velerov2alpha1api.DataDownload) bool {
return dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseFailed ||
dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseCanceled ||
dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseCompleted
}

func UpdateDataDownloadWithRetry(ctx context.Context, client client.Client, namespacedName types.NamespacedName, log *logrus.Entry, updateFunc func(dataDownload *velerov2alpha1api.DataDownload)) error {
return wait.PollUntilWithContext(ctx, time.Second, func(ctx context.Context) (done bool, err error) {
dd := &velerov2alpha1api.DataDownload{}
if err := client.Get(ctx, namespacedName, dd); err != nil {
return false, errors.Wrap(err, "getting DataDownload")
}

updateFunc(dd)
updateErr := client.Update(ctx, dd)
if updateErr != nil {
if apierrors.IsConflict(updateErr) {
log.Warnf("failed to update datadownload for %s/%s and will retry it", dd.Namespace, dd.Name)
return false, nil
}
log.Errorf("failed to update datadownload with error %s for %s/%s", updateErr.Error(), dd.Namespace, dd.Name)
return false, err
}

return true, nil
})
}
Loading

0 comments on commit 411bd54

Please sign in to comment.