Skip to content

Commit

Permalink
Add --privileged-datamover-pods option to installer and node agent
Browse files Browse the repository at this point in the history
  • Loading branch information
sseago committed Sep 24, 2024
1 parent 11f771f commit cc4d1ce
Show file tree
Hide file tree
Showing 13 changed files with 143 additions and 61 deletions.
3 changes: 3 additions & 0 deletions pkg/cmd/cli/install/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type Options struct {
VolumeSnapshotConfig flag.Map
UseNodeAgent bool
PrivilegedNodeAgent bool
PrivilegedDatamoverPods bool
//TODO remove UseRestic when migration test out of using it
UseRestic bool
Wait bool
Expand Down Expand Up @@ -120,6 +121,7 @@ func (o *Options) BindFlags(flags *pflag.FlagSet) {
flags.BoolVar(&o.DryRun, "dry-run", o.DryRun, "Generate resources, but don't send them to the cluster. Use with -o. Optional.")
flags.BoolVar(&o.UseNodeAgent, "use-node-agent", o.UseNodeAgent, "Create Velero node-agent daemonset. Optional. Velero node-agent hosts Velero modules that need to run in one or more nodes(i.e. Restic, Kopia).")
flags.BoolVar(&o.PrivilegedNodeAgent, "privileged-node-agent", o.PrivilegedNodeAgent, "Use privileged mode for the node agent. Optional. Required to backup block devices.")
flags.BoolVar(&o.PrivilegedDatamoverPods, "privileged-datamover-pods", o.PrivilegedDatamoverPods, "Use privileged mode for the datamover pods. Optional.")
flags.BoolVar(&o.Wait, "wait", o.Wait, "Wait for Velero deployment to be ready. Optional.")
flags.DurationVar(&o.DefaultRepoMaintenanceFrequency, "default-repo-maintain-frequency", o.DefaultRepoMaintenanceFrequency, "How often 'maintain' is run for backup repositories by default. Optional.")
flags.DurationVar(&o.GarbageCollectionFrequency, "garbage-collection-frequency", o.GarbageCollectionFrequency, "How often the garbage collection runs for expired backups.(default 1h)")
Expand Down Expand Up @@ -263,6 +265,7 @@ func (o *Options) AsVeleroOptions() (*install.VeleroOptions, error) {
RestoreOnly: o.RestoreOnly,
UseNodeAgent: o.UseNodeAgent,
PrivilegedNodeAgent: o.PrivilegedNodeAgent,
PrivilegedDatamoverPods: o.PrivilegedDatamoverPods,
UseVolumeSnapshots: o.UseVolumeSnapshots,
BSLConfig: o.BackupStorageConfig.Data(),
VSLConfig: o.VolumeSnapshotConfig.Data(),
Expand Down
16 changes: 15 additions & 1 deletion pkg/cmd/cli/nodeagent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type nodeAgentServerConfig struct {
resourceTimeout time.Duration
dataMoverPrepareTimeout time.Duration
nodeAgentConfig string
privilegedDatamoverPods bool
}

func NewServerCommand(f client.Factory) *cobra.Command {
Expand Down Expand Up @@ -126,6 +127,7 @@ func NewServerCommand(f client.Factory) *cobra.Command {
command.Flags().DurationVar(&config.dataMoverPrepareTimeout, "data-mover-prepare-timeout", config.dataMoverPrepareTimeout, "How long to wait for preparing a DataUpload/DataDownload. Default is 30 minutes.")
command.Flags().StringVar(&config.metricsAddress, "metrics-address", config.metricsAddress, "The address to expose prometheus metrics")
command.Flags().StringVar(&config.nodeAgentConfig, "node-agent-configmap", config.nodeAgentConfig, "The name of ConfigMap containing node-agent configurations.")
command.Flags().BoolVar(&config.privilegedDatamoverPods, "privileged-datamover-pods", config.privilegedDatamoverPods, "Use privileged mode for the datamover pods. Optional.")

return command
}
Expand Down Expand Up @@ -330,12 +332,24 @@ func (s *nodeAgentServer) run() {
s.config.dataMoverPrepareTimeout,
s.logger,
s.metrics,
s.config.privilegedDatamoverPods,
)
if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data upload controller")
}

dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, podResources, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
dataDownloadReconciler := controller.NewDataDownloadReconciler(
s.mgr.GetClient(),
s.mgr,
s.kubeClient,
s.dataPathMgr,
podResources,
s.nodeName,
s.config.dataMoverPrepareTimeout,
s.logger,
s.metrics,
s.config.privilegedDatamoverPods,
)
if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data download controller")
}
Expand Down
62 changes: 37 additions & 25 deletions pkg/controller/data_download_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,33 +53,45 @@ import (

// DataDownloadReconciler reconciles a DataDownload object
type DataDownloadReconciler struct {
client client.Client
kubeClient kubernetes.Interface
mgr manager.Manager
logger logrus.FieldLogger
Clock clock.WithTickerAndDelayedExecution
restoreExposer exposer.GenericRestoreExposer
nodeName string
dataPathMgr *datapath.Manager
podResources v1.ResourceRequirements
preparingTimeout time.Duration
metrics *metrics.ServerMetrics
client client.Client
kubeClient kubernetes.Interface
mgr manager.Manager
logger logrus.FieldLogger
Clock clock.WithTickerAndDelayedExecution
restoreExposer exposer.GenericRestoreExposer
nodeName string
dataPathMgr *datapath.Manager
podResources v1.ResourceRequirements
preparingTimeout time.Duration
metrics *metrics.ServerMetrics
privilegedDatamoverPods bool
}

func NewDataDownloadReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager,
podResources v1.ResourceRequirements, nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler {
func NewDataDownloadReconciler(
client client.Client,
mgr manager.Manager,
kubeClient kubernetes.Interface,
dataPathMgr *datapath.Manager,
podResources v1.ResourceRequirements,
nodeName string,
preparingTimeout time.Duration,
logger logrus.FieldLogger,
metrics *metrics.ServerMetrics,
privilegedDatamoverPods bool,
) *DataDownloadReconciler {
return &DataDownloadReconciler{
client: client,
kubeClient: kubeClient,
mgr: mgr,
logger: logger.WithField("controller", "DataDownload"),
Clock: &clock.RealClock{},
nodeName: nodeName,
restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger),
dataPathMgr: dataPathMgr,
podResources: podResources,
preparingTimeout: preparingTimeout,
metrics: metrics,
client: client,
kubeClient: kubeClient,
mgr: mgr,
logger: logger.WithField("controller", "DataDownload"),
Clock: &clock.RealClock{},
nodeName: nodeName,
restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger),
dataPathMgr: dataPathMgr,
podResources: podResources,
preparingTimeout: preparingTimeout,
metrics: metrics,
privilegedDatamoverPods: privilegedDatamoverPods,
}
}

Expand Down Expand Up @@ -182,7 +194,7 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request
// Expose() will trigger to create one pod whose volume is restored by a given volume snapshot,
// but the pod maybe is not in the same node of the current controller, so we need to return it here.
// And then only the controller who is in the same node could do the rest work.
err = r.restoreExposer.Expose(ctx, getDataDownloadOwnerObject(dd), dd.Spec.TargetVolume.PVC, dd.Spec.TargetVolume.Namespace, hostingPodLabels, r.podResources, dd.Spec.OperationTimeout.Duration)
err = r.restoreExposer.Expose(ctx, getDataDownloadOwnerObject(dd), dd.Spec.TargetVolume.PVC, dd.Spec.TargetVolume.Namespace, hostingPodLabels, r.podResources, dd.Spec.OperationTimeout.Duration, r.privilegedDatamoverPods)
if err != nil {
if err := r.client.Get(ctx, req.NamespacedName, dd); err != nil {
if !apierrors.IsNotFound(err) {
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/data_download_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func initDataDownloadReconcilerWithError(objects []runtime.Object, needError ...

dataPathMgr := datapath.NewManager(1)

return NewDataDownloadReconciler(fakeClient, nil, fakeKubeClient, dataPathMgr, corev1.ResourceRequirements{}, "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
return NewDataDownloadReconciler(fakeClient, nil, fakeKubeClient, dataPathMgr, corev1.ResourceRequirements{}, "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics(), false), nil
}

func TestDataDownloadReconcile(t *testing.T) {
Expand Down Expand Up @@ -441,7 +441,7 @@ func TestDataDownloadReconcile(t *testing.T) {
r.restoreExposer = func() exposer.GenericRestoreExposer {
ep := exposermockes.NewGenericRestoreExposer(t)
if test.isExposeErr {
ep.On("Expose", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errors.New("Error to expose restore exposer"))
ep.On("Expose", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errors.New("Error to expose restore exposer"))
} else if test.notNilExpose {
hostingPod := builder.ForPod("test-ns", "test-name").Volumes(&corev1.Volume{Name: "test-pvc"}).Result()
hostingPod.ObjectMeta.SetUID("test-uid")
Expand Down Expand Up @@ -959,7 +959,7 @@ func (dt *ddResumeTestHelper) resumeCancellableDataPath(_ *DataUploadReconciler,
return dt.resumeErr
}

func (dt *ddResumeTestHelper) Expose(context.Context, corev1.ObjectReference, string, string, map[string]string, corev1.ResourceRequirements, time.Duration) error {
func (dt *ddResumeTestHelper) Expose(context.Context, corev1.ObjectReference, string, string, map[string]string, corev1.ResourceRequirements, time.Duration, bool) error {
return nil
}

Expand Down
44 changes: 24 additions & 20 deletions pkg/controller/data_upload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,21 @@ const (

// DataUploadReconciler reconciles a DataUpload object
type DataUploadReconciler struct {
client client.Client
kubeClient kubernetes.Interface
csiSnapshotClient snapshotter.SnapshotV1Interface
mgr manager.Manager
Clock clocks.WithTickerAndDelayedExecution
nodeName string
logger logrus.FieldLogger
snapshotExposerList map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer
dataPathMgr *datapath.Manager
loadAffinity *kube.LoadAffinity
backupPVCConfig map[string]nodeagent.BackupPVC
podResources corev1.ResourceRequirements
preparingTimeout time.Duration
metrics *metrics.ServerMetrics
client client.Client
kubeClient kubernetes.Interface
csiSnapshotClient snapshotter.SnapshotV1Interface
mgr manager.Manager
Clock clocks.WithTickerAndDelayedExecution
nodeName string
logger logrus.FieldLogger
snapshotExposerList map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer
dataPathMgr *datapath.Manager
loadAffinity *kube.LoadAffinity
backupPVCConfig map[string]nodeagent.BackupPVC
podResources corev1.ResourceRequirements
preparingTimeout time.Duration
metrics *metrics.ServerMetrics
privilegedDatamoverPods bool
}

func NewDataUploadReconciler(
Expand All @@ -92,6 +93,7 @@ func NewDataUploadReconciler(
preparingTimeout time.Duration,
log logrus.FieldLogger,
metrics *metrics.ServerMetrics,
privilegedDatamoverPods bool,
) *DataUploadReconciler {
return &DataUploadReconciler{
client: client,
Expand All @@ -108,12 +110,13 @@ func NewDataUploadReconciler(
log,
),
},
dataPathMgr: dataPathMgr,
loadAffinity: loadAffinity,
backupPVCConfig: backupPVCConfig,
podResources: podResources,
preparingTimeout: preparingTimeout,
metrics: metrics,
dataPathMgr: dataPathMgr,
loadAffinity: loadAffinity,
backupPVCConfig: backupPVCConfig,
podResources: podResources,
preparingTimeout: preparingTimeout,
metrics: metrics,
privilegedDatamoverPods: privilegedDatamoverPods,
}
}

Expand Down Expand Up @@ -816,6 +819,7 @@ func (r *DataUploadReconciler) setupExposeParam(du *velerov2alpha1api.DataUpload
Affinity: r.loadAffinity,
BackupPVCConfig: r.backupPVCConfig,
Resources: r.podResources,
Privileged: r.privilegedDatamoverPods,
}, nil
}
return nil, nil
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/data_upload_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ func initDataUploaderReconcilerWithError(needError ...error) (*DataUploadReconci
time.Minute*5,
velerotest.NewLogger(),
metrics.NewServerMetrics(),
false,
), nil
}

Expand Down
11 changes: 11 additions & 0 deletions pkg/exposer/csi_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ type CSISnapshotExposeParam struct {

// Resources defines the resource requirements of the hosting pod
Resources corev1.ResourceRequirements

// Privileged determines whether to create a privileged pod
Privileged bool
}

// CSISnapshotExposeWaitParam define the input param for WaitExposed of CSI snapshots
Expand Down Expand Up @@ -202,6 +205,7 @@ func (e *csiSnapshotExposer) Expose(ctx context.Context, ownerObject corev1.Obje
csiExposeParam.HostingPodLabels,
csiExposeParam.Affinity,
csiExposeParam.Resources,
csiExposeParam.Privileged,
)
if err != nil {
return errors.Wrap(err, "error to create backup pod")
Expand Down Expand Up @@ -441,6 +445,7 @@ func (e *csiSnapshotExposer) createBackupPod(
label map[string]string,
affinity *kube.LoadAffinity,
resources corev1.ResourceRequirements,
privileged bool,
) (*corev1.Pod, error) {
podName := ownerObject.Name

Expand Down Expand Up @@ -550,5 +555,11 @@ func (e *csiSnapshotExposer) createBackupPod(
},
}

if privileged {
pod.Spec.Containers[0].SecurityContext = &corev1.SecurityContext{
Privileged: boolptr.True(),
}
}

return e.kubeClient.CoreV1().Pods(ownerObject.Namespace).Create(ctx, pod, metav1.CreateOptions{})
}
33 changes: 28 additions & 5 deletions pkg/exposer/generic_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
// GenericRestoreExposer is the interfaces for a generic restore exposer
type GenericRestoreExposer interface {
// Expose starts the process to a restore expose, the expose process may take long time
Expose(context.Context, corev1.ObjectReference, string, string, map[string]string, corev1.ResourceRequirements, time.Duration) error
Expose(context.Context, corev1.ObjectReference, string, string, map[string]string, corev1.ResourceRequirements, time.Duration, bool) error

// GetExposed polls the status of the expose.
// If the expose is accessible by the current caller, it waits the expose ready and returns the expose result.
Expand Down Expand Up @@ -69,7 +69,16 @@ type genericRestoreExposer struct {
log logrus.FieldLogger
}

func (e *genericRestoreExposer) Expose(ctx context.Context, ownerObject corev1.ObjectReference, targetPVCName string, sourceNamespace string, hostingPodLabels map[string]string, resources corev1.ResourceRequirements, timeout time.Duration) error {
func (e *genericRestoreExposer) Expose(
ctx context.Context,
ownerObject corev1.ObjectReference,
targetPVCName string,
sourceNamespace string,
hostingPodLabels map[string]string,
resources corev1.ResourceRequirements,
timeout time.Duration,
privileged bool,
) error {
curLog := e.log.WithFields(logrus.Fields{
"owner": ownerObject.Name,
"target PVC": targetPVCName,
Expand All @@ -87,7 +96,7 @@ func (e *genericRestoreExposer) Expose(ctx context.Context, ownerObject corev1.O
return errors.Errorf("Target PVC %s/%s has already been bound, abort", sourceNamespace, targetPVCName)
}

restorePod, err := e.createRestorePod(ctx, ownerObject, targetPVC, timeout, hostingPodLabels, selectedNode, resources)
restorePod, err := e.createRestorePod(ctx, ownerObject, targetPVC, timeout, hostingPodLabels, selectedNode, resources, privileged)
if err != nil {
return errors.Wrapf(err, "error to create restore pod")
}
Expand Down Expand Up @@ -296,8 +305,16 @@ func (e *genericRestoreExposer) RebindVolume(ctx context.Context, ownerObject co
return nil
}

func (e *genericRestoreExposer) createRestorePod(ctx context.Context, ownerObject corev1.ObjectReference, targetPVC *corev1.PersistentVolumeClaim,
operationTimeout time.Duration, label map[string]string, selectedNode string, resources corev1.ResourceRequirements) (*corev1.Pod, error) {
func (e *genericRestoreExposer) createRestorePod(
ctx context.Context,
ownerObject corev1.ObjectReference,
targetPVC *corev1.PersistentVolumeClaim,
operationTimeout time.Duration,
label map[string]string,
selectedNode string,
resources corev1.ResourceRequirements,
privileged bool,
) (*corev1.Pod, error) {
restorePodName := ownerObject.Name
restorePVCName := ownerObject.Name

Expand Down Expand Up @@ -384,6 +401,12 @@ func (e *genericRestoreExposer) createRestorePod(ctx context.Context, ownerObjec
},
}

if privileged {
pod.Spec.Containers[0].SecurityContext = &corev1.SecurityContext{
Privileged: boolptr.True(),
}
}

return e.kubeClient.CoreV1().Pods(ownerObject.Namespace).Create(ctx, pod, metav1.CreateOptions{})
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/exposer/generic_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func TestRestoreExpose(t *testing.T) {
}
}

err := exposer.Expose(context.Background(), ownerObject, test.targetPVCName, test.sourceNamespace, map[string]string{}, corev1.ResourceRequirements{}, time.Millisecond)
err := exposer.Expose(context.Background(), ownerObject, test.targetPVCName, test.sourceNamespace, map[string]string{}, corev1.ResourceRequirements{}, time.Millisecond, false)
assert.EqualError(t, err, test.err)
})
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/exposer/mocks/generic_restore.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit cc4d1ce

Please sign in to comment.