Skip to content

Commit

Permalink
Convert Pod Volume Restore resource/controller to the Kubebuilder fra…
Browse files Browse the repository at this point in the history
…mework

Convert Pod Volume Restore resource/controller to the Kubebuilder framework

Fixes vmware-tanzu#4134

Signed-off-by: Wenkai Yin(尹文开) <yinw@vmware.com>
  • Loading branch information
ywk253100 authored and Xun Jiang committed May 5, 2022
1 parent 861f106 commit 494d357
Show file tree
Hide file tree
Showing 13 changed files with 356 additions and 669 deletions.
2 changes: 1 addition & 1 deletion Tiltfile
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ local_resource(

local_resource(
"restic_binary",
cmd = 'cd ' + '.' + ';mkdir -p _tiltbuild/restic; BIN=velero GOOS=linux GOARCH=amd64 RESTIC_VERSION=0.12.0 OUTPUT_DIR=_tiltbuild/restic ./hack/download-restic.sh',
cmd = 'cd ' + '.' + ';mkdir -p _tiltbuild/restic; BIN=velero GOOS=linux GOARCH=amd64 RESTIC_VERSION=0.13.1 OUTPUT_DIR=_tiltbuild/restic ./hack/download-restic.sh',
)

# Note: we need a distro with a bash shell to exec into the Velero container
Expand Down
1 change: 1 addition & 0 deletions changelogs/unreleased/4655-ywk253100
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert Pod Volume Restore resource/controller to the Kubebuilder framework
34 changes: 33 additions & 1 deletion config/crd/v1/bases/velero.io_podvolumerestores.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,37 @@ spec:
singular: podvolumerestore
scope: Namespaced
versions:
- name: v1
- additionalPrinterColumns:
- description: Namespace of the pod containing the volume to be restored
jsonPath: .spec.pod.namespace
name: Namespace
type: string
- description: Name of the pod containing the volume to be restored
jsonPath: .spec.pod.name
name: Pod
type: string
- description: Name of the volume to be restored
jsonPath: .spec.volume
name: Volume
type: string
- description: Pod Volume Restore status such as New/InProgress
jsonPath: .status.phase
name: Status
type: string
- description: Pod Volume Restore status such as New/InProgress
format: int64
jsonPath: .status.progress.totalBytes
name: TotalBytes
type: integer
- description: Pod Volume Restore status such as New/InProgress
format: int64
jsonPath: .status.progress.bytesDone
name: BytesDone
type: integer
- jsonPath: .metadata.creationTimestamp
name: Age
type: date
name: v1
schema:
openAPIV3Schema:
properties:
Expand Down Expand Up @@ -136,6 +166,8 @@ spec:
type: object
served: true
storage: true
subresources:
status: {}
status:
acceptedNames:
kind: ""
Expand Down
2 changes: 1 addition & 1 deletion config/crd/v1/crds/crds.go

Large diffs are not rendered by default.

38 changes: 38 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,24 @@ metadata:
creationTimestamp: null
name: velero-perms
rules:
- apiGroups:
- ""
resources:
- persistentvolumerclaims
verbs:
- get
- apiGroups:
- ""
resources:
- persistentvolumes
verbs:
- get
- apiGroups:
- ""
resources:
- pods
verbs:
- get
- apiGroups:
- velero.io
resources:
Expand Down Expand Up @@ -93,6 +111,26 @@ rules:
- get
- patch
- update
- apiGroups:
- velero.io
resources:
- podvolumerestores
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- velero.io
resources:
- podvolumerestores/status
verbs:
- get
- patch
- update
- apiGroups:
- velero.io
resources:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,20 @@ type PodVolumeRestoreStatus struct {
Progress PodVolumeOperationProgress `json:"progress,omitempty"`
}

// TODO(2.0) After converting all resources to use the runtime-controller client, the genclient and k8s:deepcopy markers will no longer be needed and should be removed.
// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +kubebuilder:object:generate=true
// +kubebuilder:object:root=true
// +kubebuilder:storageversion
// +kubebuilder:subresource:status
// +kubebuilder:printcolumn:name="Namespace",type="string",JSONPath=".spec.pod.namespace",description="Namespace of the pod containing the volume to be restored"
// +kubebuilder:printcolumn:name="Pod",type="string",JSONPath=".spec.pod.name",description="Name of the pod containing the volume to be restored"
// +kubebuilder:printcolumn:name="Volume",type="string",JSONPath=".spec.volume",description="Name of the volume to be restored"
// +kubebuilder:printcolumn:name="Status",type="string",JSONPath=".status.phase",description="Pod Volume Restore status such as New/InProgress"
// +kubebuilder:printcolumn:name="TotalBytes",type="integer",format="int64",JSONPath=".status.progress.totalBytes",description="Pod Volume Restore status such as New/InProgress"
// +kubebuilder:printcolumn:name="BytesDone",type="integer",format="int64",JSONPath=".status.progress.bytesDone",description="Pod Volume Restore status such as New/InProgress"
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"

type PodVolumeRestore struct {
metav1.TypeMeta `json:",inline"`
Expand All @@ -98,6 +110,8 @@ type PodVolumeRestore struct {
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +kubebuilder:object:generate=true
// +kubebuilder:object:root=true

// PodVolumeRestoreList is a list of PodVolumeRestores.
type PodVolumeRestoreList struct {
Expand Down
125 changes: 40 additions & 85 deletions pkg/cmd/cli/restic/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,27 +30,23 @@ import (
v1 "k8s.io/api/core/v1"
storagev1api "k8s.io/api/storage/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets"
kubeinformers "k8s.io/client-go/informers"
corev1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/manager"

"github.com/vmware-tanzu/velero/internal/credentials"
"github.com/vmware-tanzu/velero/internal/util/managercontroller"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/buildinfo"
"github.com/vmware-tanzu/velero/pkg/client"
"github.com/vmware-tanzu/velero/pkg/cmd"
"github.com/vmware-tanzu/velero/pkg/cmd/util/signals"
"github.com/vmware-tanzu/velero/pkg/controller"
clientset "github.com/vmware-tanzu/velero/pkg/generated/clientset/versioned"
informers "github.com/vmware-tanzu/velero/pkg/generated/informers/externalversions"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/restic"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
Expand Down Expand Up @@ -101,45 +97,17 @@ func NewServerCommand(f client.Factory) *cobra.Command {
}

type resticServer struct {
kubeClient kubernetes.Interface
veleroClient clientset.Interface
veleroInformerFactory informers.SharedInformerFactory
kubeInformerFactory kubeinformers.SharedInformerFactory
podInformer cache.SharedIndexInformer
logger logrus.FieldLogger
ctx context.Context
cancelFunc context.CancelFunc
fileSystem filesystem.Interface
mgr manager.Manager
metrics *metrics.ServerMetrics
metricsAddress string
namespace string
logger logrus.FieldLogger
ctx context.Context
cancelFunc context.CancelFunc
fileSystem filesystem.Interface
mgr manager.Manager
metrics *metrics.ServerMetrics
metricsAddress string
namespace string
}

func newResticServer(logger logrus.FieldLogger, factory client.Factory, metricAddress string) (*resticServer, error) {

kubeClient, err := factory.KubeClient()
if err != nil {
return nil, err
}

veleroClient, err := factory.Client()
if err != nil {
return nil, err
}

// use a stand-alone pod informer because we want to use a field selector to
// filter to only pods scheduled on this node.
podInformer := corev1informers.NewFilteredPodInformer(
kubeClient,
metav1.NamespaceAll,
0,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
func(opts *metav1.ListOptions) {
opts.FieldSelector = fmt.Sprintf("spec.nodeName=%s", os.Getenv("NODE_NAME"))
},
)

ctx, cancelFunc := context.WithCancel(context.Background())

clientConfig, err := factory.ClientConfig()
Expand All @@ -152,29 +120,40 @@ func newResticServer(logger logrus.FieldLogger, factory client.Factory, metricAd
velerov1api.AddToScheme(scheme)
v1.AddToScheme(scheme)
storagev1api.AddToScheme(scheme)

// use a field selector to filter to only pods scheduled on this node.
cacheOption := cache.Options{
SelectorsByObject: cache.SelectorsByObject{
&v1.Pod{}: {
Field: fields.Set{"spec.nodeName": os.Getenv("NODE_NAME")}.AsSelector(),
},
},
}
mgr, err := ctrl.NewManager(clientConfig, ctrl.Options{
Scheme: scheme,
Scheme: scheme,
NewCache: cache.BuilderWithOptions(cacheOption),
})
if err != nil {
return nil, err
}

s := &resticServer{
kubeClient: kubeClient,
veleroClient: veleroClient,
veleroInformerFactory: informers.NewFilteredSharedInformerFactory(veleroClient, 0, factory.Namespace(), nil),
kubeInformerFactory: kubeinformers.NewSharedInformerFactory(kubeClient, 0),
podInformer: podInformer,
logger: logger,
ctx: ctx,
cancelFunc: cancelFunc,
fileSystem: filesystem.NewFileSystem(),
mgr: mgr,
metricsAddress: metricAddress,
namespace: factory.Namespace(),
logger: logger,
ctx: ctx,
cancelFunc: cancelFunc,
fileSystem: filesystem.NewFileSystem(),
mgr: mgr,
metricsAddress: metricAddress,
namespace: factory.Namespace(),
}

if err := s.validatePodVolumesHostPath(); err != nil {
// the cache isn't initialized yet when "validatePodVolumesHostPath" is called, the client returned by the manager cannot
// be used, so we need the kube client here
client, err := factory.KubeClient()
if err != nil {
return nil, err
}
if err := s.validatePodVolumesHostPath(client); err != nil {
return nil, err
}

Expand Down Expand Up @@ -218,38 +197,14 @@ func (s *resticServer) run() {
FileSystem: filesystem.NewFileSystem(),
ResticExec: restic.BackupExec{},
Log: s.logger,

PvLister: s.kubeInformerFactory.Core().V1().PersistentVolumes().Lister(),
PvcLister: s.kubeInformerFactory.Core().V1().PersistentVolumeClaims().Lister(),
}
if err := pvbReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", controller.PodVolumeBackup)
}

restoreController := controller.NewPodVolumeRestoreController(
s.logger,
s.veleroInformerFactory.Velero().V1().PodVolumeRestores(),
s.veleroClient.VeleroV1(),
s.podInformer,
s.kubeInformerFactory.Core().V1().PersistentVolumeClaims(),
s.kubeInformerFactory.Core().V1().PersistentVolumes(),
s.mgr.GetClient(),
os.Getenv("NODE_NAME"),
credentialFileStore,
)

go s.veleroInformerFactory.Start(s.ctx.Done())
go s.kubeInformerFactory.Start(s.ctx.Done())
go s.podInformer.Run(s.ctx.Done())

// TODO(2.0): presuming all controllers and resources are converted to runtime-controller
// by v2.0, the block from this line and including the `s.mgr.Start() will be
// deprecated, since the manager auto-starts all the caches. Until then, we need to start the
// cache for them manually.

// Adding the controllers to the manager will register them as a (runtime-controller) runnable,
// so the manager will ensure the cache is started and ready before all controller are started
s.mgr.Add(managercontroller.Runnable(restoreController, 1))
if err = controller.NewPodVolumeRestoreReconciler(s.logger, s.mgr.GetClient(), credentialFileStore).SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller")
}

s.logger.Info("Controllers starting...")

Expand All @@ -260,7 +215,7 @@ func (s *resticServer) run() {

// validatePodVolumesHostPath validates that the pod volumes path contains a
// directory for each Pod running on this node
func (s *resticServer) validatePodVolumesHostPath() error {
func (s *resticServer) validatePodVolumesHostPath(client kubernetes.Interface) error {
files, err := s.fileSystem.ReadDir("/host_pods/")
if err != nil {
return errors.Wrap(err, "could not read pod volumes host path")
Expand All @@ -274,7 +229,7 @@ func (s *resticServer) validatePodVolumesHostPath() error {
}
}

pods, err := s.kubeClient.CoreV1().Pods("").List(s.ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("spec.nodeName=%s,status.phase=Running", os.Getenv("NODE_NAME"))})
pods, err := client.CoreV1().Pods("").List(s.ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("spec.nodeName=%s,status.phase=Running", os.Getenv("NODE_NAME"))})
if err != nil {
return errors.WithStack(err)
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/cmd/cli/restic/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,11 @@ func Test_validatePodVolumesHostPath(t *testing.T) {
}

s := &resticServer{
kubeClient: kubeClient,
logger: testutil.NewLogger(),
fileSystem: fs,
}

err := s.validatePodVolumesHostPath()
err := s.validatePodVolumesHostPath(kubeClient)
if tt.wantErr {
assert.Error(t, err)
} else {
Expand Down
6 changes: 1 addition & 5 deletions pkg/controller/pod_volume_backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
corev1listers "k8s.io/client-go/listers/core/v1"
"sigs.k8s.io/cluster-api/util/patch"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -60,9 +59,6 @@ type PodVolumeBackupReconciler struct {
FileSystem filesystem.Interface
ResticExec BackupExecuter
Log logrus.FieldLogger

PvLister corev1listers.PersistentVolumeLister
PvcLister corev1listers.PersistentVolumeClaimLister
}

// +kubebuilder:rbac:groups=velero.io,resources=podvolumebackups,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -302,7 +298,7 @@ type resticDetails struct {
}

func (r *PodVolumeBackupReconciler) buildResticCommand(ctx context.Context, log *logrus.Entry, pvb *velerov1api.PodVolumeBackup, pod *corev1.Pod, details *resticDetails) (*restic.Command, error) {
volDir, err := kube.GetVolumeDirectory(log, pod, pvb.Spec.Volume, r.PvcLister, r.PvLister, r.Client)
volDir, err := kube.GetVolumeDirectory(ctx, log, pod, pvb.Spec.Volume, r.Client)
if err != nil {
return nil, errors.Wrap(err, "getting volume directory name")
}
Expand Down
Loading

0 comments on commit 494d357

Please sign in to comment.