Skip to content

Commit

Permalink
Merge pull request #8115 from Lyndon-Li/data-mover-ms-smoking-test
Browse files Browse the repository at this point in the history
Data mover micro service smoke testing
  • Loading branch information
Lyndon-Li authored Aug 21, 2024
2 parents ec6090b + 0ed1a7f commit f63b714
Show file tree
Hide file tree
Showing 18 changed files with 407 additions and 329 deletions.
2 changes: 2 additions & 0 deletions pkg/cmd/cli/datamover/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ func (s *dataMoverBackup) runDataPath() {

err = dpService.Init()
if err != nil {
dpService.Shutdown()
s.cancelFunc()
funcExitWithMessage(s.logger, false, "Failed to init data path service for DataUpload %s: %v", s.config.duName, err)
return
Expand All @@ -233,6 +234,7 @@ func (s *dataMoverBackup) runDataPath() {

result, err := dpService.RunCancelableDataPath(s.ctx)
if err != nil {
dpService.Shutdown()
s.cancelFunc()
funcExitWithMessage(s.logger, false, "Failed to run data path service for DataUpload %s: %v", s.config.duName, err)
return
Expand Down
2 changes: 2 additions & 0 deletions pkg/cmd/cli/datamover/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,13 +215,15 @@ func (s *dataMoverRestore) runDataPath() {

err = dpService.Init()
if err != nil {
dpService.Shutdown()
s.cancelFunc()
funcExitWithMessage(s.logger, false, "Failed to init data path service for DataDownload %s: %v", s.config.ddName, err)
return
}

result, err := dpService.RunCancelableDataPath(s.ctx)
if err != nil {
dpService.Shutdown()
s.cancelFunc()
funcExitWithMessage(s.logger, false, "Failed to run data path service for DataDownload %s: %v", s.config.ddName, err)
return
Expand Down
44 changes: 35 additions & 9 deletions pkg/cmd/cli/nodeagent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ import (
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/logging"

cacheutil "k8s.io/client-go/tools/cache"
)

var (
Expand Down Expand Up @@ -300,28 +302,29 @@ func (s *nodeAgentServer) run() {
backupPVCConfig = s.dataPathConfigs.BackupPVCConfig
}

dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, loadAffinity, backupPVCConfig, repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, loadAffinity, backupPVCConfig, clock.RealClock{}, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data upload controller")
}

dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data download controller")
}

go func() {
s.mgr.GetCache().WaitForCacheSync(s.ctx)

if err := dataUploadReconciler.AttemptDataUploadResume(s.ctx, s.mgr.GetClient(), s.logger.WithField("node", s.nodeName), s.namespace); err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data upload resume")
if err := s.waitCacheForResume(); err != nil {
s.logger.WithError(err).Error("Failed to wait cache for resume, will not resume DU/DD")
return
}

if err := dataDownloadReconciler.AttemptDataDownloadResume(s.ctx, s.mgr.GetClient(), s.logger.WithField("node", s.nodeName), s.namespace); err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data download resume")
if err := dataUploadReconciler.AttemptDataUploadResume(s.ctx, s.logger.WithField("node", s.nodeName), s.namespace); err != nil {
s.logger.WithError(errors.WithStack(err)).Error("Failed to attempt data upload resume")
}

s.logger.Info("Attempt complete to resume dataUploads and dataDownloads")
if err := dataDownloadReconciler.AttemptDataDownloadResume(s.ctx, s.logger.WithField("node", s.nodeName), s.namespace); err != nil {
s.logger.WithError(errors.WithStack(err)).Error("Failed to attempt data download resume")
}
}()

s.logger.Info("Controllers starting...")
Expand All @@ -331,6 +334,29 @@ func (s *nodeAgentServer) run() {
}
}

func (s *nodeAgentServer) waitCacheForResume() error {
podInformer, err := s.mgr.GetCache().GetInformer(s.ctx, &v1.Pod{})
if err != nil {
return errors.Wrap(err, "error getting pod informer")
}

duInformer, err := s.mgr.GetCache().GetInformer(s.ctx, &velerov2alpha1api.DataUpload{})
if err != nil {
return errors.Wrap(err, "error getting du informer")
}

ddInformer, err := s.mgr.GetCache().GetInformer(s.ctx, &velerov2alpha1api.DataDownload{})
if err != nil {
return errors.Wrap(err, "error getting dd informer")
}

if !cacheutil.WaitForCacheSync(s.ctx.Done(), podInformer.HasSynced, duInformer.HasSynced, ddInformer.HasSynced) {
return errors.New("error waiting informer synced")
}

return nil
}

// validatePodVolumesHostPath validates that the pod volumes path contains a
// directory for each Pod running on this node
func (s *nodeAgentServer) validatePodVolumesHostPath(client kubernetes.Interface) error {
Expand Down
132 changes: 69 additions & 63 deletions pkg/controller/data_download_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,53 +39,44 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/vmware-tanzu/velero/internal/credentials"
"github.com/vmware-tanzu/velero/pkg/apis/velero/shared"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
datamover "github.com/vmware-tanzu/velero/pkg/datamover"
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/exposer"
"github.com/vmware-tanzu/velero/pkg/metrics"
repository "github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/kube"
)

// DataDownloadReconciler reconciles a DataDownload object
type DataDownloadReconciler struct {
client client.Client
kubeClient kubernetes.Interface
mgr manager.Manager
logger logrus.FieldLogger
credentialGetter *credentials.CredentialGetter
fileSystem filesystem.Interface
Clock clock.WithTickerAndDelayedExecution
restoreExposer exposer.GenericRestoreExposer
nodeName string
repositoryEnsurer *repository.Ensurer
dataPathMgr *datapath.Manager
preparingTimeout time.Duration
metrics *metrics.ServerMetrics
client client.Client
kubeClient kubernetes.Interface
mgr manager.Manager
logger logrus.FieldLogger
Clock clock.WithTickerAndDelayedExecution
restoreExposer exposer.GenericRestoreExposer
nodeName string
dataPathMgr *datapath.Manager
preparingTimeout time.Duration
metrics *metrics.ServerMetrics
}

func NewDataDownloadReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager,
repoEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler {
nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler {
return &DataDownloadReconciler{
client: client,
kubeClient: kubeClient,
mgr: mgr,
logger: logger.WithField("controller", "DataDownload"),
credentialGetter: credentialGetter,
fileSystem: filesystem.NewFileSystem(),
Clock: &clock.RealClock{},
nodeName: nodeName,
repositoryEnsurer: repoEnsurer,
restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger),
dataPathMgr: dataPathMgr,
preparingTimeout: preparingTimeout,
metrics: metrics,
client: client,
kubeClient: kubeClient,
mgr: mgr,
logger: logger.WithField("controller", "DataDownload"),
Clock: &clock.RealClock{},
nodeName: nodeName,
restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger),
dataPathMgr: dataPathMgr,
preparingTimeout: preparingTimeout,
metrics: metrics,
}
}

Expand Down Expand Up @@ -225,9 +216,9 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted {
if dd.Spec.Cancel {
log.Debugf("Data download is been canceled %s in Phase %s", dd.GetName(), dd.Status.Phase)
r.TryCancelDataDownload(ctx, dd, "")
r.tryCancelAcceptedDataDownload(ctx, dd, "")
} else if peekErr := r.restoreExposer.PeekExposed(ctx, getDataDownloadOwnerObject(dd)); peekErr != nil {
r.TryCancelDataDownload(ctx, dd, fmt.Sprintf("found a dataupload %s/%s with expose error: %s. mark it as cancel", dd.Namespace, dd.Name, peekErr))
r.tryCancelAcceptedDataDownload(ctx, dd, fmt.Sprintf("found a dataupload %s/%s with expose error: %s. mark it as cancel", dd.Namespace, dd.Name, peekErr))
log.Errorf("Cancel dd %s/%s because of expose error %s", dd.Namespace, dd.Name, peekErr)
} else if dd.Status.StartTimestamp != nil {
if time.Since(dd.Status.StartTimestamp.Time) >= r.preparingTimeout {
Expand Down Expand Up @@ -280,23 +271,35 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
return r.errorOut(ctx, dd, err, "error to create data path", log)
}
}

if err := r.initCancelableDataPath(ctx, asyncBR, result, log); err != nil {
log.WithError(err).Errorf("Failed to init cancelable data path for %s", dd.Name)

r.closeDataPath(ctx, dd.Name)
return r.errorOut(ctx, dd, err, "error initializing data path", log)
}

// Update status to InProgress
original := dd.DeepCopy()
dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseInProgress
dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
if err := r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil {
log.WithError(err).Error("Unable to update status to in progress")
return ctrl.Result{}, err
log.WithError(err).Warnf("Failed to update datadownload %s to InProgress, will close data path and retry", dd.Name)

r.closeDataPath(ctx, dd.Name)
return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil
}

log.Info("Data download is marked as in progress")

reconcileResult, err := r.runCancelableDataPath(ctx, asyncBR, dd, result, log)
if err != nil {
log.Errorf("Failed to run cancelable data path for %s with err %v", dd.Name, err)
if err := r.startCancelableDataPath(asyncBR, dd, result, log); err != nil {
log.WithError(err).Errorf("Failed to start cancelable data path for %s", dd.Name)

r.closeDataPath(ctx, dd.Name)
return r.errorOut(ctx, dd, err, "error starting data path", log)
}
return reconcileResult, err

return ctrl.Result{}, nil
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
log.Info("Data download is in progress")
if dd.Spec.Cancel {
Expand Down Expand Up @@ -339,27 +342,33 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
}
}

func (r *DataDownloadReconciler) runCancelableDataPath(ctx context.Context, asyncBR datapath.AsyncBR, dd *velerov2alpha1api.DataDownload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) {
func (r *DataDownloadReconciler) initCancelableDataPath(ctx context.Context, asyncBR datapath.AsyncBR, res *exposer.ExposeResult, log logrus.FieldLogger) error {
log.Info("Init cancelable dataDownload")

if err := asyncBR.Init(ctx, nil); err != nil {
return r.errorOut(ctx, dd, err, "error to initialize asyncBR", log)
return errors.Wrap(err, "error initializing asyncBR")
}

log.Infof("async restore init for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName)

return nil
}

func (r *DataDownloadReconciler) startCancelableDataPath(asyncBR datapath.AsyncBR, dd *velerov2alpha1api.DataDownload, res *exposer.ExposeResult, log logrus.FieldLogger) error {
log.Info("Start cancelable dataDownload")

if err := asyncBR.StartRestore(dd.Spec.SnapshotID, datapath.AccessPoint{
ByPath: res.ByPod.VolumeName,
}, dd.Spec.DataMoverConfig); err != nil {
return r.errorOut(ctx, dd, err, fmt.Sprintf("error starting async restore for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName), log)
return errors.Wrapf(err, "error starting async restore for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName)
}

log.Infof("Async restore started for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName)
return ctrl.Result{}, nil
log.Infof("Async restore started for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName)
return nil
}

func (r *DataDownloadReconciler) OnDataDownloadCompleted(ctx context.Context, namespace string, ddName string, result datapath.Result) {
defer func() {
go r.closeDataPath(ctx, ddName)
}()
defer r.dataPathMgr.RemoveAsyncBR(ddName)

log := r.logger.WithField("datadownload", ddName)
log.Info("Async fs restore data path completed")
Expand Down Expand Up @@ -392,9 +401,7 @@ func (r *DataDownloadReconciler) OnDataDownloadCompleted(ctx context.Context, na
}

func (r *DataDownloadReconciler) OnDataDownloadFailed(ctx context.Context, namespace string, ddName string, err error) {
defer func() {
go r.closeDataPath(ctx, ddName)
}()
defer r.dataPathMgr.RemoveAsyncBR(ddName)

log := r.logger.WithField("datadownload", ddName)

Expand All @@ -404,16 +411,12 @@ func (r *DataDownloadReconciler) OnDataDownloadFailed(ctx context.Context, names
if getErr := r.client.Get(ctx, types.NamespacedName{Name: ddName, Namespace: namespace}, &dd); getErr != nil {
log.WithError(getErr).Warn("Failed to get data download on failure")
} else {
if _, errOut := r.errorOut(ctx, &dd, err, "data path restore failed", log); err != nil {
log.WithError(err).Warnf("Failed to patch data download with err %v", errOut)
}
_, _ = r.errorOut(ctx, &dd, err, "data path restore failed", log)
}
}

func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, namespace string, ddName string) {
defer func() {
go r.closeDataPath(ctx, ddName)
}()
defer r.dataPathMgr.RemoveAsyncBR(ddName)

log := r.logger.WithField("datadownload", ddName)

Expand All @@ -440,17 +443,20 @@ func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, na
}
}

func (r *DataDownloadReconciler) TryCancelDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload, message string) {
func (r *DataDownloadReconciler) tryCancelAcceptedDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload, message string) {
log := r.logger.WithField("datadownload", dd.Name)
log.Warn("Async fs backup data path canceled")
log.Warn("Accepted data download is canceled")

succeeded, err := r.exclusiveUpdateDataDownload(ctx, dd, func(dataDownload *velerov2alpha1api.DataDownload) {
dataDownload.Status.Phase = velerov2alpha1api.DataDownloadPhaseCanceled
if dataDownload.Status.StartTimestamp.IsZero() {
dataDownload.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()}
}
dataDownload.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()}
dataDownload.Status.Message = message

if message != "" {
dataDownload.Status.Message = message
}
})

if err != nil {
Expand All @@ -464,7 +470,6 @@ func (r *DataDownloadReconciler) TryCancelDataDownload(ctx context.Context, dd *
// success update
r.metrics.RegisterDataDownloadCancel(r.nodeName)
r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd))
r.closeDataPath(ctx, dd.Name)
}

func (r *DataDownloadReconciler) OnDataDownloadProgress(ctx context.Context, namespace string, ddName string, progress *uploader.Progress) {
Expand Down Expand Up @@ -762,9 +767,9 @@ func UpdateDataDownloadWithRetry(ctx context.Context, client client.Client, name

var funcResumeCancellableDataRestore = (*DataDownloadReconciler).resumeCancellableDataPath

func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context, cli client.Client, logger *logrus.Entry, ns string) error {
func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context, logger *logrus.Entry, ns string) error {
dataDownloads := &velerov2alpha1api.DataDownloadList{}
if err := cli.List(ctx, dataDownloads, &client.ListOptions{Namespace: ns}); err != nil {
if err := r.client.List(ctx, dataDownloads, &client.ListOptions{Namespace: ns}); err != nil {
r.logger.WithError(errors.WithStack(err)).Error("failed to list datadownloads")
return errors.Wrapf(err, "error to list datadownloads")
}
Expand All @@ -783,13 +788,14 @@ func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context,

err := funcResumeCancellableDataRestore(r, ctx, dd, logger)
if err == nil {
logger.WithField("dd", dd.Name).WithField("current node", r.nodeName).Info("Completed to resume in progress DD")
continue
}

logger.WithField("datadownload", dd.GetName()).WithError(err).Warn("Failed to resume data path for dd, have to cancel it")

resumeErr := err
err = UpdateDataDownloadWithRetry(ctx, cli, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, logger.WithField("datadownload", dd.Name),
err = UpdateDataDownloadWithRetry(ctx, r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, logger.WithField("datadownload", dd.Name),
func(dataDownload *velerov2alpha1api.DataDownload) bool {
if dataDownload.Spec.Cancel {
return false
Expand All @@ -806,7 +812,7 @@ func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context,
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted {
r.logger.WithField("datadownload", dd.GetName()).Warn("Cancel dd under Accepted phase")

err := UpdateDataDownloadWithRetry(ctx, cli, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name},
err := UpdateDataDownloadWithRetry(ctx, r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name},
r.logger.WithField("datadownload", dd.Name), func(dataDownload *velerov2alpha1api.DataDownload) bool {
if dataDownload.Spec.Cancel {
return false
Expand Down
Loading

0 comments on commit f63b714

Please sign in to comment.