Skip to content

Commit

Permalink
add wait timeout for expose prepare
Browse files Browse the repository at this point in the history
Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
  • Loading branch information
Lyndon-Li committed Jul 7, 2023
1 parent 7deae4c commit 76a9936
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 24 deletions.
69 changes: 64 additions & 5 deletions pkg/controller/data_download_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,14 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
}
log.Info("Restore is exposed")

return ctrl.Result{}, nil
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted {
if dd.Status.StartTimestamp != nil {
if time.Since(dd.Status.StartTimestamp.Time) >= preparingTimeout {
r.onPrepareTimeout(ctx, dd)
}
}

return ctrl.Result{}, nil
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared {
log.Info("Data download is prepared")
Expand Down Expand Up @@ -184,7 +192,6 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
// Update status to InProgress
original := dd.DeepCopy()
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseInProgress
dd.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()}
if err := r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("Unable to update status to in progress")
return ctrl.Result{}, err
Expand Down Expand Up @@ -345,8 +352,15 @@ func (r *DataDownloadReconciler) OnDataDownloadProgress(ctx context.Context, nam
// re-enqueue the previous related request once the related pod is in running status to keep going on the rest logic. and below logic will avoid handling the unwanted
// pod status and also avoid block others CR handling
func (r *DataDownloadReconciler) SetupWithManager(mgr ctrl.Manager) error {
s := kube.NewPeriodicalEnqueueSource(r.logger, r.client, &velerov2alpha1api.DataDownloadList{}, preparingMonitorFrequency, kube.PeriodicalEnqueueSourceOption{})
gp := kube.NewGenericEventPredicate(func(object client.Object) bool {
dd := object.(*velerov2alpha1api.DataDownload)
return (dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted)
})

return ctrl.NewControllerManagedBy(mgr).
For(&velerov2alpha1api.DataDownload{}).
Watches(s, nil, builder.WithPredicates(gp)).
Watches(&source.Kind{Type: &v1.Pod{}}, kube.EnqueueRequestsFromMapUpdateFunc(r.findSnapshotRestoreForPod),
builder.WithPredicates(predicate.Funcs{
UpdateFunc: func(ue event.UpdateEvent) bool {
Expand Down Expand Up @@ -453,17 +467,62 @@ func (r *DataDownloadReconciler) updateStatusToFailed(ctx context.Context, dd *v
}

func (r *DataDownloadReconciler) acceptDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload) (bool, error) {
updated := dd.DeepCopy()
updated.Status.Phase = velerov2alpha1api.DataDownloadPhaseAccepted
r.logger.Infof("Accepting data download %s", dd.Name)

r.logger.Infof("Accepting snapshot restore %s", dd.Name)
// For all data download controller in each node-agent will try to update download CR, and only one controller will success,
// and the success one could handle later logic
succeeded, err := r.exclusiveUpdateDataDownload(ctx, dd, func(dd *velerov2alpha1api.DataDownload) {
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseAccepted
dd.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()}
})

if err != nil {
return false, err
}

if succeeded {
r.logger.WithField("DataDownload", dd.Name).Infof("This datadownload has been accepted by %s", r.nodeName)
return true, nil
}

r.logger.WithField("DataDownload", dd.Name).Info("This datadownload has been accepted by others")
return false, nil
}

func (r *DataDownloadReconciler) onPrepareTimeout(ctx context.Context, dd *velerov2alpha1api.DataDownload) {
log := r.logger.WithField("DataDownload", dd.Name)

log.Info("Timeout happend for preparing datadownload")

Check failure on line 495 in pkg/controller/data_download_controller.go

View workflow job for this annotation

GitHub Actions / Run Codespell

happend ==> happened, happens, happen

succeeded, err := r.exclusiveUpdateDataDownload(ctx, dd, func(dd *velerov2alpha1api.DataDownload) {
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseFailed
dd.Status.Message = "timeout on preparing data download"
})

if err != nil {
log.WithError(err).Warn("Failed to update datadownload")
return
}

if !succeeded {
log.Warn("Dataupload has been updated by others")
return
}

r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd))

log.Info("Dataupload has been cleaned up")
}

func (r *DataDownloadReconciler) exclusiveUpdateDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload,
updateFunc func(*velerov2alpha1api.DataDownload)) (bool, error) {
updated := dd.DeepCopy()
updateFunc(updated)

err := r.client.Update(ctx, updated)
if err == nil {
return true, nil
} else if apierrors.IsConflict(err) {
r.logger.WithField("DataDownload", dd.Name).Error("This data download restore has been accepted by others")
return false, nil
} else {
return false, err
Expand Down
106 changes: 87 additions & 19 deletions pkg/controller/data_upload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ import (
const dataMoverType string = "velero"
const dataUploadDownloadRequestor string = "snapshot-data-upload-download"

const preparingTimeout time.Duration = time.Minute * 5
const preparingMonitorFrequency time.Duration = time.Minute

// DataUploadReconciler reconciles a DataUpload object
type DataUploadReconciler struct {
client client.Client
Expand Down Expand Up @@ -143,6 +146,14 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// ep.Expose() will trigger to create one pod whose volume is restored by a given volume snapshot,
// but the pod maybe is not in the same node of the current controller, so we need to return it here.
// And then only the controller who is in the same node could do the rest work.
return ctrl.Result{}, nil
} else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted {
if du.Status.StartTimestamp != nil {
if time.Since(du.Status.StartTimestamp.Time) >= preparingTimeout {
r.onPrepareTimeout(ctx, &du)
}
}

return ctrl.Result{}, nil
} else if du.Status.Phase == velerov2alpha1api.DataUploadPhasePrepared {
log.Info("Data upload is prepared")
Expand Down Expand Up @@ -183,7 +194,6 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// Update status to InProgress
original := du.DeepCopy()
du.Status.Phase = velerov2alpha1api.DataUploadPhaseInProgress
du.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()}
if err := r.client.Patch(ctx, &du, client.MergeFrom(original)); err != nil {
return r.errorOut(ctx, &du, err, "error updating dataupload status", log)
}
Expand Down Expand Up @@ -363,8 +373,15 @@ func (r *DataUploadReconciler) OnDataUploadProgress(ctx context.Context, namespa
// re-enqueue the previous related request once the related pod is in running status to keep going on the rest logic. and below logic will avoid handling the unwanted
// pod status and also avoid block others CR handling
func (r *DataUploadReconciler) SetupWithManager(mgr ctrl.Manager) error {
s := kube.NewPeriodicalEnqueueSource(r.logger, r.client, &velerov2alpha1api.DataUploadList{}, preparingMonitorFrequency, kube.PeriodicalEnqueueSourceOption{})
gp := kube.NewGenericEventPredicate(func(object client.Object) bool {
du := object.(*velerov2alpha1api.DataUpload)
return (du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted)
})

return ctrl.NewControllerManagedBy(mgr).
For(&velerov2alpha1api.DataUpload{}).
Watches(s, nil, builder.WithPredicates(gp)).
Watches(&source.Kind{Type: &corev1.Pod{}}, kube.EnqueueRequestsFromMapUpdateFunc(r.findDataUploadForPod),
builder.WithPredicates(predicate.Funcs{
UpdateFunc: func(ue event.UpdateEvent) bool {
Expand Down Expand Up @@ -416,8 +433,15 @@ func (r *DataUploadReconciler) findDataUploadForPod(podObj client.Object) []reco
}

r.logger.WithField("Backup pod", pod.Name).Infof("Preparing dataupload %s", du.Name)
if err := r.patchDataUpload(context.Background(), du, r.prepareDataUpload); err != nil {
r.logger.WithField("Backup pod", pod.Name).WithError(err).Error("failed to patch dataupload")

// we don't expect anyone else update the CR during the Prepare process
updated, err := r.exclusiveUpdateDataUpload(context.Background(), du, r.prepareDataUpload)
if err != nil || !updated {
r.logger.WithFields(logrus.Fields{
"Dataupload": du.Name,
"Backup pod": pod.Name,
"updated": updated,
}).WithError(err).Warn("failed to patch dataupload, prepare will halt for this dataupload")
return []reconcile.Request{}
}

Expand All @@ -430,16 +454,6 @@ func (r *DataUploadReconciler) findDataUploadForPod(podObj client.Object) []reco
return []reconcile.Request{requests}
}

func (r *DataUploadReconciler) patchDataUpload(ctx context.Context, req *velerov2alpha1api.DataUpload, mutate func(*velerov2alpha1api.DataUpload)) error {
original := req.DeepCopy()
mutate(req)
if err := r.client.Patch(ctx, req, client.MergeFrom(original)); err != nil {
return errors.Wrap(err, "error patching DataUpload")
}

return nil
}

func (r *DataUploadReconciler) prepareDataUpload(du *velerov2alpha1api.DataUpload) {
du.Status.Phase = velerov2alpha1api.DataUploadPhasePrepared
du.Status.Node = r.nodeName
Expand Down Expand Up @@ -475,19 +489,73 @@ func (r *DataUploadReconciler) updateStatusToFailed(ctx context.Context, du *vel
}

func (r *DataUploadReconciler) acceptDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload) (bool, error) {
updated := du.DeepCopy()
updated.Status.Phase = velerov2alpha1api.DataUploadPhaseAccepted

r.logger.Infof("Accepting snapshot backup %s", du.Name)
r.logger.Infof("Accepting data upload %s", du.Name)

// For all data upload controller in each node-agent will try to update dataupload CR, and only one controller will success,
// and the success one could handle later logic
succeeded, err := r.exclusiveUpdateDataUpload(ctx, du, func(du *velerov2alpha1api.DataUpload) {
du.Status.Phase = velerov2alpha1api.DataUploadPhaseAccepted
du.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()}
})

if err != nil {
return false, err
}

if succeeded {
r.logger.WithField("Dataupload", du.Name).Infof("This datauplod has been accepted by %s", r.nodeName)
return true, nil
}

r.logger.WithField("Dataupload", du.Name).Info("This datauplod has been accepted by others")
return false, nil
}

func (r *DataUploadReconciler) onPrepareTimeout(ctx context.Context, du *velerov2alpha1api.DataUpload) {
log := r.logger.WithField("Dataupload", du.Name)

log.Info("Timeout happend for preparing dataupload")

Check failure on line 517 in pkg/controller/data_upload_controller.go

View workflow job for this annotation

GitHub Actions / Run Codespell

happend ==> happened, happens, happen

succeeded, err := r.exclusiveUpdateDataUpload(ctx, du, func(du *velerov2alpha1api.DataUpload) {
du.Status.Phase = velerov2alpha1api.DataUploadPhaseFailed
du.Status.Message = "timeout on preparing data upload"
})

if err != nil {
log.WithError(err).Warn("Failed to update dataupload")
return
}

if !succeeded {
log.Warn("Dataupload has been updated by others")
return
}

ep, ok := r.snapshotExposerList[du.Spec.SnapshotType]
if !ok {
log.WithError(fmt.Errorf("%v type of snapshot exposer is not exist", du.Spec.SnapshotType)).
Warn("Failed to clean up resources on canceled")
} else {
var volumeSnapshotName string
if du.Spec.SnapshotType == velerov2alpha1api.SnapshotTypeCSI { // Other exposer should have another condition
volumeSnapshotName = du.Spec.CSISnapshot.VolumeSnapshot
}

ep.CleanUp(ctx, getOwnerObject(du), volumeSnapshotName, du.Spec.SourceNamespace)

log.Info("Dataupload has been cleaned up")
}
}

func (r *DataUploadReconciler) exclusiveUpdateDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload,
updateFunc func(*velerov2alpha1api.DataUpload)) (bool, error) {
updated := du.DeepCopy()
updateFunc(updated)

err := r.client.Update(ctx, updated)
if err == nil {
r.logger.WithField("Dataupload", du.Name).Infof("This datauplod backup has been accepted by %s", r.nodeName)
return true, nil
} else if apierrors.IsConflict(err) {
r.logger.WithField("Dataupload", du.Name).Info("This datauplod backup has been accepted by others")
return false, nil
} else {
return false, err
Expand Down

0 comments on commit 76a9936

Please sign in to comment.