Skip to content

Commit

Permalink
add wait timeout for expose prepare
Browse files Browse the repository at this point in the history
Signed-off-by: Lyndon-Li <lyonghui@vmware.com>
  • Loading branch information
Lyndon-Li committed Jul 7, 2023
1 parent 7deae4c commit bf2a981
Show file tree
Hide file tree
Showing 7 changed files with 461 additions and 62 deletions.
6 changes: 6 additions & 0 deletions pkg/builder/data_download_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,9 @@ func (d *DataDownloadBuilder) ObjectMeta(opts ...ObjectMetaOpt) *DataDownloadBui

return d
}

// StartTimestamp sets the DataDownload's StartTimestamp.
func (d *DataDownloadBuilder) StartTimestamp(startTime *metav1.Time) *DataDownloadBuilder {
d.object.Status.StartTimestamp = startTime
return d
}
6 changes: 6 additions & 0 deletions pkg/builder/data_upload_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,9 @@ func (d *DataUploadBuilder) CSISnapshot(cSISnapshot *velerov2alpha1api.CSISnapsh
d.object.Spec.CSISnapshot = cSISnapshot
return d
}

// StartTimestamp sets the DataUpload's StartTimestamp.
func (d *DataUploadBuilder) StartTimestamp(startTime *metav1.Time) *DataUploadBuilder {
d.object.Status.StartTimestamp = startTime
return d
}
18 changes: 11 additions & 7 deletions pkg/cmd/cli/nodeagent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,20 +71,23 @@ const (
// files will be written to
defaultCredentialsDirectory = "/tmp/credentials"

defaultResourceTimeout = 10 * time.Minute
defaultResourceTimeout = 10 * time.Minute
defaultDataMoverPrepareTimeout = 30 * time.Minute
)

type nodeAgentServerConfig struct {
metricsAddress string
resourceTimeout time.Duration
metricsAddress string
resourceTimeout time.Duration
dataMoverPrepareTimeout time.Duration
}

func NewServerCommand(f client.Factory) *cobra.Command {
logLevelFlag := logging.LogLevelFlag(logrus.InfoLevel)
formatFlag := logging.NewFormatFlag()
config := nodeAgentServerConfig{
metricsAddress: defaultMetricsAddress,
resourceTimeout: defaultResourceTimeout,
metricsAddress: defaultMetricsAddress,
resourceTimeout: defaultResourceTimeout,
dataMoverPrepareTimeout: defaultDataMoverPrepareTimeout,
}

command := &cobra.Command{
Expand All @@ -110,6 +113,7 @@ func NewServerCommand(f client.Factory) *cobra.Command {
command.Flags().Var(logLevelFlag, "log-level", fmt.Sprintf("The level at which to log. Valid values are %s.", strings.Join(logLevelFlag.AllowedValues(), ", ")))
command.Flags().Var(formatFlag, "log-format", fmt.Sprintf("The format for log output. Valid values are %s.", strings.Join(formatFlag.AllowedValues(), ", ")))
command.Flags().DurationVar(&config.resourceTimeout, "resource-timeout", config.resourceTimeout, "How long to wait for resource processes which are not covered by other specific timeout parameters. Default is 10 minutes.")
command.Flags().DurationVar(&config.dataMoverPrepareTimeout, "data-mover-prepare-timeout", config.dataMoverPrepareTimeout, "How long to wait for preparing a DataUpload/DataDownload. Default is 30 minutes.")

return command
}
Expand Down Expand Up @@ -256,11 +260,11 @@ func (s *nodeAgentServer) run() {
s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller")
}

if err = controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.logger).SetupWithManager(s.mgr); err != nil {
if err = controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger).SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data upload controller")
}

if err = controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, repoEnsurer, credentialGetter, s.nodeName, s.logger).SetupWithManager(s.mgr); err != nil {
if err = controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger).SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data download controller")
}

Expand Down
95 changes: 76 additions & 19 deletions pkg/controller/data_download_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,11 @@ type DataDownloadReconciler struct {
nodeName string
repositoryEnsurer *repository.Ensurer
dataPathMgr *datapath.Manager
preparingTimeout time.Duration
}

func NewDataDownloadReconciler(client client.Client, kubeClient kubernetes.Interface,
repoEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, nodeName string, logger logrus.FieldLogger) *DataDownloadReconciler {
repoEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger) *DataDownloadReconciler {
return &DataDownloadReconciler{
client: client,
kubeClient: kubeClient,
Expand All @@ -77,6 +78,7 @@ func NewDataDownloadReconciler(client client.Client, kubeClient kubernetes.Inter
repositoryEnsurer: repoEnsurer,
restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger),
dataPathMgr: datapath.NewManager(1),
preparingTimeout: preparingTimeout,
}
}

Expand Down Expand Up @@ -143,6 +145,14 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
}
log.Info("Restore is exposed")

return ctrl.Result{}, nil
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted {
if dd.Status.StartTimestamp != nil {
if time.Since(dd.Status.StartTimestamp.Time) >= r.preparingTimeout {
r.onPrepareTimeout(ctx, dd)
}
}

return ctrl.Result{}, nil
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared {
log.Info("Data download is prepared")
Expand Down Expand Up @@ -184,7 +194,6 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
// Update status to InProgress
original := dd.DeepCopy()
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseInProgress
dd.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()}
if err := r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("Unable to update status to in progress")
return ctrl.Result{}, err
Expand Down Expand Up @@ -345,8 +354,15 @@ func (r *DataDownloadReconciler) OnDataDownloadProgress(ctx context.Context, nam
// re-enqueue the previous related request once the related pod is in running status to keep going on the rest logic. and below logic will avoid handling the unwanted
// pod status and also avoid block others CR handling
func (r *DataDownloadReconciler) SetupWithManager(mgr ctrl.Manager) error {
s := kube.NewPeriodicalEnqueueSource(r.logger, r.client, &velerov2alpha1api.DataDownloadList{}, preparingMonitorFrequency, kube.PeriodicalEnqueueSourceOption{})
gp := kube.NewGenericEventPredicate(func(object client.Object) bool {
dd := object.(*velerov2alpha1api.DataDownload)
return (dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted)
})

return ctrl.NewControllerManagedBy(mgr).
For(&velerov2alpha1api.DataDownload{}).
Watches(s, nil, builder.WithPredicates(gp)).
Watches(&source.Kind{Type: &v1.Pod{}}, kube.EnqueueRequestsFromMapUpdateFunc(r.findSnapshotRestoreForPod),
builder.WithPredicates(predicate.Funcs{
UpdateFunc: func(ue event.UpdateEvent) bool {
Expand Down Expand Up @@ -400,9 +416,15 @@ func (r *DataDownloadReconciler) findSnapshotRestoreForPod(podObj client.Object)
requests := make([]reconcile.Request, 1)

r.logger.WithField("Restore pod", pod.Name).Infof("Preparing data download %s", dd.Name)
err = r.patchDataDownload(context.Background(), dd, r.prepareDataDownload)
if err != nil {
r.logger.WithField("Restore pod", pod.Name).WithError(err).Error("unable to patch data download")

// we don't expect anyone else update the CR during the Prepare process
updated, err := r.exclusiveUpdateDataDownload(context.Background(), dd, r.prepareDataDownload)
if err != nil || !updated {
r.logger.WithFields(logrus.Fields{
"Datadownload": dd.Name,
"Restore pod": pod.Name,
"updated": updated,
}).WithError(err).Warn("failed to patch datadownload, prepare will halt for this datadownload")
return []reconcile.Request{}
}

Expand All @@ -416,16 +438,6 @@ func (r *DataDownloadReconciler) findSnapshotRestoreForPod(podObj client.Object)
return requests
}

func (r *DataDownloadReconciler) patchDataDownload(ctx context.Context, req *velerov2alpha1api.DataDownload, mutate func(*velerov2alpha1api.DataDownload)) error {
original := req.DeepCopy()
mutate(req)
if err := r.client.Patch(ctx, req, client.MergeFrom(original)); err != nil {
return errors.Wrap(err, "error patching data download")
}

return nil
}

func (r *DataDownloadReconciler) prepareDataDownload(ssb *velerov2alpha1api.DataDownload) {
ssb.Status.Phase = velerov2alpha1api.DataDownloadPhasePrepared
ssb.Status.Node = r.nodeName
Expand Down Expand Up @@ -453,17 +465,62 @@ func (r *DataDownloadReconciler) updateStatusToFailed(ctx context.Context, dd *v
}

func (r *DataDownloadReconciler) acceptDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload) (bool, error) {
updated := dd.DeepCopy()
updated.Status.Phase = velerov2alpha1api.DataDownloadPhaseAccepted
r.logger.Infof("Accepting data download %s", dd.Name)

r.logger.Infof("Accepting snapshot restore %s", dd.Name)
// For all data download controller in each node-agent will try to update download CR, and only one controller will success,
// and the success one could handle later logic
succeeded, err := r.exclusiveUpdateDataDownload(ctx, dd, func(dd *velerov2alpha1api.DataDownload) {
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseAccepted
dd.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()}
})

if err != nil {
return false, err
}

if succeeded {
r.logger.WithField("DataDownload", dd.Name).Infof("This datadownload has been accepted by %s", r.nodeName)
return true, nil
}

r.logger.WithField("DataDownload", dd.Name).Info("This datadownload has been accepted by others")
return false, nil
}

func (r *DataDownloadReconciler) onPrepareTimeout(ctx context.Context, dd *velerov2alpha1api.DataDownload) {
log := r.logger.WithField("DataDownload", dd.Name)

log.Info("Timeout happened for preparing datadownload")

succeeded, err := r.exclusiveUpdateDataDownload(ctx, dd, func(dd *velerov2alpha1api.DataDownload) {
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseFailed
dd.Status.Message = "timeout on preparing data download"
})

if err != nil {
log.WithError(err).Warn("Failed to update datadownload")
return
}

if !succeeded {
log.Warn("Dataupload has been updated by others")
return
}

r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd))

log.Info("Dataupload has been cleaned up")
}

func (r *DataDownloadReconciler) exclusiveUpdateDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload,
updateFunc func(*velerov2alpha1api.DataDownload)) (bool, error) {
updated := dd.DeepCopy()
updateFunc(updated)

err := r.client.Update(ctx, updated)
if err == nil {
return true, nil
} else if apierrors.IsConflict(err) {
r.logger.WithField("DataDownload", dd.Name).Error("This data download restore has been accepted by others")
return false, nil
} else {
return false, err
Expand Down
Loading

0 comments on commit bf2a981

Please sign in to comment.