Skip to content

Commit

Permalink
Merge pull request #8074 from Lyndon-Li/data-mover-ms-new-controller-1
Browse files Browse the repository at this point in the history
Data mover micro service new controller
  • Loading branch information
Lyndon-Li authored Aug 7, 2024
2 parents 2645948 + 903458b commit dd3d05b
Show file tree
Hide file tree
Showing 12 changed files with 117 additions and 109 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/8074-Lyndon-Li
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Data mover micro service DUCR/DDCR controller refactor according to design #7576
6 changes: 6 additions & 0 deletions pkg/builder/data_download_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ func (d *DataDownloadBuilder) ObjectMeta(opts ...ObjectMetaOpt) *DataDownloadBui
return d
}

// Labels sets the DataDownload's Labels.
func (d *DataDownloadBuilder) Labels(labels map[string]string) *DataDownloadBuilder {
d.object.Labels = labels
return d
}

// StartTimestamp sets the DataDownload's StartTimestamp.
func (d *DataDownloadBuilder) StartTimestamp(startTime *metav1.Time) *DataDownloadBuilder {
d.object.Status.StartTimestamp = startTime
Expand Down
3 changes: 1 addition & 2 deletions pkg/cmd/cli/datamover/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,7 @@ func (s *dataMoverBackup) run() {
}
}()

// TODOOO: call s.runDataPath()
time.Sleep(time.Duration(1<<63 - 1))
s.runDataPath()
}

func (s *dataMoverBackup) runDataPath() {
Expand Down
3 changes: 1 addition & 2 deletions pkg/cmd/cli/datamover/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,7 @@ func (s *dataMoverRestore) run() {
}
}()

// TODOOO: call s.runDataPath()
time.Sleep(time.Duration(1<<63 - 1))
s.runDataPath()
}

func (s *dataMoverRestore) runDataPath() {
Expand Down
6 changes: 3 additions & 3 deletions pkg/cmd/cli/nodeagent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func NewServerCommand(f client.Factory) *cobra.Command {
logLevel := logLevelFlag.Parse()
logrus.Infof("Setting log-level to %s", strings.ToUpper(logLevel.String()))

logger := logging.DefaultLogger(logLevel, formatFlag.Parse())
logger := logging.DefaultMergeLogger(logLevel, formatFlag.Parse())
logger.Infof("Starting Velero node-agent server %s (%s)", buildinfo.Version, buildinfo.FormattedGitSHA())

f.SetBasename(fmt.Sprintf("%s-%s", c.Parent().Name(), c.Name()))
Expand Down Expand Up @@ -292,13 +292,13 @@ func (s *nodeAgentServer) run() {
if s.dataPathConfigs != nil && len(s.dataPathConfigs.LoadAffinity) > 0 {
loadAffinity = s.dataPathConfigs.LoadAffinity[0]
}
dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, loadAffinity, repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, loadAffinity, repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
s.attemptDataUploadResume(dataUploadReconciler)
if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data upload controller")
}

dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, s.dataPathMgr, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
s.attemptDataDownloadResume(dataDownloadReconciler)
if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data download controller")
Expand Down
71 changes: 34 additions & 37 deletions pkg/controller/data_download_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

Expand All @@ -56,6 +57,7 @@ import (
type DataDownloadReconciler struct {
client client.Client
kubeClient kubernetes.Interface
mgr manager.Manager
logger logrus.FieldLogger
credentialGetter *credentials.CredentialGetter
fileSystem filesystem.Interface
Expand All @@ -68,11 +70,12 @@ type DataDownloadReconciler struct {
metrics *metrics.ServerMetrics
}

func NewDataDownloadReconciler(client client.Client, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager,
func NewDataDownloadReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager,
repoEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler {
return &DataDownloadReconciler{
client: client,
kubeClient: kubeClient,
mgr: mgr,
logger: logger.WithField("controller", "DataDownload"),
credentialGetter: credentialGetter,
fileSystem: filesystem.NewFileSystem(),
Expand Down Expand Up @@ -234,9 +237,9 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
return ctrl.Result{}, nil
}

fsRestore := r.dataPathMgr.GetAsyncBR(dd.Name)
asyncBR := r.dataPathMgr.GetAsyncBR(dd.Name)

if fsRestore != nil {
if asyncBR != nil {
log.Info("Cancellable data path is already started")
return ctrl.Result{}, nil
}
Expand All @@ -259,7 +262,8 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
OnProgress: r.OnDataDownloadProgress,
}

fsRestore, err = r.dataPathMgr.CreateFileSystemBR(dd.Name, dataUploadDownloadRequestor, ctx, r.client, dd.Namespace, callbacks, log)
asyncBR, err = r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, r.kubeClient, r.mgr, datapath.TaskTypeRestore,
dd.Name, dd.Namespace, result.ByPod.HostingPod.Name, result.ByPod.HostingContainer, dd.Name, callbacks, false, log)
if err != nil {
if err == datapath.ConcurrentLimitExceed {
log.Info("Data path instance is concurrent limited requeue later")
Expand All @@ -279,7 +283,7 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request

log.Info("Data download is marked as in progress")

reconcileResult, err := r.runCancelableDataPath(ctx, fsRestore, dd, result, log)
reconcileResult, err := r.runCancelableDataPath(ctx, asyncBR, dd, result, log)
if err != nil {
log.Errorf("Failed to run cancelable data path for %s with err %v", dd.Name, err)
r.closeDataPath(ctx, dd.Name)
Expand All @@ -289,8 +293,8 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
log.Info("Data download is in progress")
if dd.Spec.Cancel {
log.Info("Data download is being canceled")
fsRestore := r.dataPathMgr.GetAsyncBR(dd.Name)
if fsRestore == nil {
asyncBR := r.dataPathMgr.GetAsyncBR(dd.Name)
if asyncBR == nil {
if r.nodeName == dd.Status.Node {
r.OnDataDownloadCancelled(ctx, dd.GetNamespace(), dd.GetName())
} else {
Expand All @@ -306,7 +310,7 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
log.WithError(err).Error("error updating data download status")
return ctrl.Result{}, err
}
fsRestore.Cancel()
asyncBR.Cancel()
return ctrl.Result{}, nil
}

Expand All @@ -327,38 +331,27 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
}
}

func (r *DataDownloadReconciler) runCancelableDataPath(ctx context.Context, fsRestore datapath.AsyncBR, dd *velerov2alpha1api.DataDownload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) {
path, err := exposer.GetPodVolumeHostPath(ctx, res.ByPod.HostingPod, res.ByPod.VolumeName, r.client, r.fileSystem, log)
if err != nil {
return r.errorOut(ctx, dd, err, "error exposing host path for pod volume", log)
}

log.WithField("path", path.ByPath).Debug("Found host path")

if err := fsRestore.Init(ctx, &datapath.FSBRInitParam{
BSLName: dd.Spec.BackupStorageLocation,
SourceNamespace: dd.Spec.SourceNamespace,
UploaderType: datamover.GetUploaderType(dd.Spec.DataMover),
RepositoryType: velerov1api.BackupRepositoryTypeKopia,
RepoIdentifier: "",
RepositoryEnsurer: r.repositoryEnsurer,
CredentialGetter: r.credentialGetter,
}); err != nil {
return r.errorOut(ctx, dd, err, "error to initialize data path", log)
func (r *DataDownloadReconciler) runCancelableDataPath(ctx context.Context, asyncBR datapath.AsyncBR, dd *velerov2alpha1api.DataDownload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) {
if err := asyncBR.Init(ctx, nil); err != nil {
return r.errorOut(ctx, dd, err, "error to initialize asyncBR", log)
}

log.WithField("path", path.ByPath).Info("fs init")
log.Infof("async restore init for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName)

if err := fsRestore.StartRestore(dd.Spec.SnapshotID, path, dd.Spec.DataMoverConfig); err != nil {
return r.errorOut(ctx, dd, err, fmt.Sprintf("error starting data path %s restore", path.ByPath), log)
if err := asyncBR.StartRestore(dd.Spec.SnapshotID, datapath.AccessPoint{
ByPath: res.ByPod.VolumeName,
}, dd.Spec.DataMoverConfig); err != nil {
return r.errorOut(ctx, dd, err, fmt.Sprintf("error starting async restore for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName), log)
}

log.WithField("path", path.ByPath).Info("Async fs restore data path started")
log.Infof("Async restore started for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName)
return ctrl.Result{}, nil
}

func (r *DataDownloadReconciler) OnDataDownloadCompleted(ctx context.Context, namespace string, ddName string, result datapath.Result) {
defer r.closeDataPath(ctx, ddName)
defer func() {
go r.closeDataPath(ctx, ddName)
}()

log := r.logger.WithField("datadownload", ddName)
log.Info("Async fs restore data path completed")
Expand Down Expand Up @@ -391,7 +384,9 @@ func (r *DataDownloadReconciler) OnDataDownloadCompleted(ctx context.Context, na
}

func (r *DataDownloadReconciler) OnDataDownloadFailed(ctx context.Context, namespace string, ddName string, err error) {
defer r.closeDataPath(ctx, ddName)
defer func() {
go r.closeDataPath(ctx, ddName)
}()

log := r.logger.WithField("datadownload", ddName)

Expand All @@ -408,7 +403,9 @@ func (r *DataDownloadReconciler) OnDataDownloadFailed(ctx context.Context, names
}

func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, namespace string, ddName string) {
defer r.closeDataPath(ctx, ddName)
defer func() {
go r.closeDataPath(ctx, ddName)
}()

log := r.logger.WithField("datadownload", ddName)

Expand Down Expand Up @@ -561,7 +558,7 @@ func (r *DataDownloadReconciler) findSnapshotRestoreForPod(ctx context.Context,
log.WithError(err).Warn("failed to cancel datadownload, and it will wait for prepare timeout")
return []reconcile.Request{}
}
log.Info("Exposed pod is in abnormal status, and datadownload is marked as cancel")
log.Infof("Exposed pod is in abnormal status(reason %s) and datadownload is marked as cancel", reason)
} else {
return []reconcile.Request{}
}
Expand Down Expand Up @@ -754,9 +751,9 @@ func (r *DataDownloadReconciler) getTargetPVC(ctx context.Context, dd *velerov2a
}

func (r *DataDownloadReconciler) closeDataPath(ctx context.Context, ddName string) {
fsBackup := r.dataPathMgr.GetAsyncBR(ddName)
if fsBackup != nil {
fsBackup.Close(ctx)
asyncBR := r.dataPathMgr.GetAsyncBR(ddName)
if asyncBR != nil {
asyncBR.Close(ctx)
}

r.dataPathMgr.RemoveAsyncBR(ddName)
Expand Down
26 changes: 11 additions & 15 deletions pkg/controller/data_download_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
clientgofake "k8s.io/client-go/kubernetes/fake"
ctrl "sigs.k8s.io/controller-runtime"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"sigs.k8s.io/controller-runtime/pkg/client/fake"
Expand Down Expand Up @@ -149,7 +151,7 @@ func initDataDownloadReconcilerWithError(objects []runtime.Object, needError ...

dataPathMgr := datapath.NewManager(1)

return NewDataDownloadReconciler(fakeClient, fakeKubeClient, dataPathMgr, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
return NewDataDownloadReconciler(fakeClient, nil, fakeKubeClient, dataPathMgr, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
}

func TestDataDownloadReconcile(t *testing.T) {
Expand Down Expand Up @@ -261,14 +263,6 @@ func TestDataDownloadReconcile(t *testing.T) {
notMockCleanUp: true,
expectedResult: &ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5},
},
{
name: "Error getting volume directory name for pvc in pod",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
notNilExpose: true,
mockClose: true,
expectedStatusMsg: "error identifying unique volume path on host",
},
{
name: "Unable to update status to in progress for data download",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
Expand Down Expand Up @@ -402,17 +396,18 @@ func TestDataDownloadReconcile(t *testing.T) {
r.dataPathMgr = datapath.NewManager(1)
}

datapath.FSBRCreator = func(string, string, kbclient.Client, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR {
fsBR := datapathmockes.NewAsyncBR(t)
datapath.MicroServiceBRWatcherCreator = func(kbclient.Client, kubernetes.Interface, manager.Manager, string, string,
string, string, string, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR {
asyncBR := datapathmockes.NewAsyncBR(t)
if test.mockCancel {
fsBR.On("Cancel").Return()
asyncBR.On("Cancel").Return()
}

if test.mockClose {
fsBR.On("Close", mock.Anything).Return()
asyncBR.On("Close", mock.Anything).Return()
}

return fsBR
return asyncBR
}

if test.isExposeErr || test.isGetExposeErr || test.isPeekExposeErr || test.isNilExposer || test.notNilExpose {
Expand Down Expand Up @@ -443,7 +438,8 @@ func TestDataDownloadReconcile(t *testing.T) {

if test.needCreateFSBR {
if fsBR := r.dataPathMgr.GetAsyncBR(test.dd.Name); fsBR == nil {
_, err := r.dataPathMgr.CreateFileSystemBR(test.dd.Name, pVBRRequestor, ctx, r.client, velerov1api.DefaultNamespace, datapath.Callbacks{OnCancelled: r.OnDataDownloadCancelled}, velerotest.NewLogger())
_, err := r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, nil, nil, datapath.TaskTypeRestore, test.dd.Name, pVBRRequestor,
velerov1api.DefaultNamespace, "", "", datapath.Callbacks{OnCancelled: r.OnDataDownloadCancelled}, false, velerotest.NewLogger())
require.NoError(t, err)
}
}
Expand Down
Loading

0 comments on commit dd3d05b

Please sign in to comment.