Skip to content

Commit

Permalink
Make Pod informer registration optional
Browse files Browse the repository at this point in the history
  • Loading branch information
saikat-royc committed Jun 29, 2020
1 parent 0df1aa7 commit d125c02
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 29 deletions.
7 changes: 5 additions & 2 deletions cmd/csi-resizer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ import (
"context"
"flag"
"fmt"
"k8s.io/client-go/util/workqueue"
"os"
"time"

"k8s.io/client-go/util/workqueue"

"github.com/kubernetes-csi/csi-lib-utils/leaderelection"
"github.com/kubernetes-csi/external-resizer/pkg/controller"
"github.com/kubernetes-csi/external-resizer/pkg/resizer"
Expand Down Expand Up @@ -53,6 +54,8 @@ var (
metricsAddress = flag.String("metrics-address", "", "The TCP network address where the prometheus metrics endpoint will listen (example: `:8080`). The default is empty string, which means metrics endpoint is disabled.")
metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.")

handleVolumeInUseError = flag.Bool("handle-volume-inuse-error", true, "Flag to turn on/off capability to handle volume in use error in resizer controller. Defaults to true if not set.")

version = "unknown"
)

Expand Down Expand Up @@ -88,7 +91,7 @@ func main() {
resizerName := csiResizer.Name()
rc := controller.NewResizeController(resizerName, csiResizer, kubeClient, *resyncPeriod, informerFactory,
workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax),
)
*handleVolumeInUseError)
run := func(ctx context.Context) {
informerFactory.Start(wait.NeverStop)
rc.Run(*workers, ctx)
Expand Down
58 changes: 33 additions & 25 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ type resizeController struct {

usedPVCs *inUsePVCStore

podLister corelisters.PodLister
podListerSynced cache.InformerSynced
podLister corelisters.PodLister
podListerSynced cache.InformerSynced
handleVolumeInUseError bool
}

// NewResizeController returns a ResizeController.
Expand All @@ -73,13 +74,10 @@ func NewResizeController(
kubeClient kubernetes.Interface,
resyncPeriod time.Duration,
informerFactory informers.SharedInformerFactory,
pvcRateLimiter workqueue.RateLimiter) ResizeController {
pvcRateLimiter workqueue.RateLimiter,
handleVolumeInUseError bool) ResizeController {
pvInformer := informerFactory.Core().V1().PersistentVolumes()
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()

// list pods so as we can identify PVC that are in-use
podInformer := informerFactory.Core().V1().Pods()

eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events(v1.NamespaceAll)})
Expand All @@ -90,18 +88,17 @@ func NewResizeController(
pvcRateLimiter, fmt.Sprintf("%s-pvc", name))

ctrl := &resizeController{
name: name,
resizer: resizer,
kubeClient: kubeClient,
pvLister: pvInformer.Lister(),
pvSynced: pvInformer.Informer().HasSynced,
pvcLister: pvcInformer.Lister(),
pvcSynced: pvcInformer.Informer().HasSynced,
podLister: podInformer.Lister(),
podListerSynced: podInformer.Informer().HasSynced,
claimQueue: claimQueue,
eventRecorder: eventRecorder,
usedPVCs: newUsedPVCStore(),
name: name,
resizer: resizer,
kubeClient: kubeClient,
pvLister: pvInformer.Lister(),
pvSynced: pvInformer.Informer().HasSynced,
pvcLister: pvcInformer.Lister(),
pvcSynced: pvcInformer.Informer().HasSynced,
claimQueue: claimQueue,
eventRecorder: eventRecorder,
usedPVCs: newUsedPVCStore(),
handleVolumeInUseError: handleVolumeInUseError,
}

// Add a resync period as the PVC's request size can be resized again when we handling
Expand All @@ -112,11 +109,18 @@ func NewResizeController(
DeleteFunc: ctrl.deletePVC,
}, resyncPeriod)

podInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
AddFunc: ctrl.addPod,
DeleteFunc: ctrl.deletePod,
UpdateFunc: ctrl.updatePod,
}, resyncPeriod)
if handleVolumeInUseError {
// list pods so as we can identify PVC that are in-use
klog.Infof("Register Pod informer for resizer %s", ctrl.name)
podInformer := informerFactory.Core().V1().Pods()
ctrl.podLister = podInformer.Lister()
ctrl.podListerSynced = podInformer.Informer().HasSynced
podInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
AddFunc: ctrl.addPod,
DeleteFunc: ctrl.deletePod,
UpdateFunc: ctrl.updatePod,
}, resyncPeriod)
}

return ctrl
}
Expand Down Expand Up @@ -235,8 +239,12 @@ func (ctrl *resizeController) Run(
defer klog.Infof("Shutting down external resizer %s", ctrl.name)

stopCh := ctx.Done()
informersSyncd := []cache.InformerSynced{ctrl.pvSynced, ctrl.pvcSynced}
if ctrl.handleVolumeInUseError {
informersSyncd = append(informersSyncd, ctrl.podListerSynced)
}

if !cache.WaitForCacheSync(stopCh, ctrl.pvSynced, ctrl.pvcSynced, ctrl.podListerSynced) {
if !cache.WaitForCacheSync(stopCh, informersSyncd...) {
klog.Errorf("Cannot sync pod, pv or pvc caches")
return
}
Expand Down
64 changes: 62 additions & 2 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
func TestController(t *testing.T) {
blockVolumeMode := v1.PersistentVolumeBlock
fsVolumeMode := v1.PersistentVolumeFilesystem

for _, test := range []struct {
Name string
PVC *v1.PersistentVolumeClaim
Expand All @@ -37,7 +38,8 @@ func TestController(t *testing.T) {
// is PVC being expanded in-use
pvcInUse bool
// does PVC being expanded has Failed Precondition errors
pvcHasInUseErrors bool
pvcHasInUseErrors bool
disableVolumeInUseErrorHandler bool
}{
{
Name: "Invalid key",
Expand Down Expand Up @@ -130,6 +132,64 @@ func TestController(t *testing.T) {
pvcHasInUseErrors: true,
pvcInUse: false,
},
// test cases with volume in use error handling disabled.
{
Name: "With volume-in-use error handler disabled, Resize PVC, no FS resize, pvc-inuse with failedprecondition",
PVC: createPVC(2, 1),
PV: createPV(1, "testPVC", defaultNS, "foobar", &fsVolumeMode),
CreateObjects: true,
CallCSIExpand: true,
pvcHasInUseErrors: true,
pvcInUse: true,
disableVolumeInUseErrorHandler: true,
},
{
Name: "With volume-in-use error handler disabled, Resize PVC, no FS resize, pvc-inuse but no failedprecondition error",
PVC: createPVC(2, 1),
PV: createPV(1, "testPVC", defaultNS, "foobar", &fsVolumeMode),
CreateObjects: true,
CallCSIExpand: true,
pvcHasInUseErrors: false,
pvcInUse: true,
disableVolumeInUseErrorHandler: true,
},
{
Name: "With volume-in-use error handler disabled, Resize PVC, no FS resize, pvc not in-use but has failedprecondition error",
PVC: createPVC(2, 1),
PV: createPV(1, "testPVC", defaultNS, "foobar", &fsVolumeMode),
CreateObjects: true,
CallCSIExpand: true,
pvcHasInUseErrors: true,
pvcInUse: false,
disableVolumeInUseErrorHandler: true,
},
{
Name: "With volume-in-use error handler disabled, Block Resize PVC with FS resize",
PVC: createPVC(2, 1),
PV: createPV(1, "testPVC", defaultNS, "foobar", &blockVolumeMode),
CreateObjects: true,
NodeResize: true,
CallCSIExpand: true,
expectBlockVolume: true,
disableVolumeInUseErrorHandler: true,
},
{
Name: "With volume-in-use error handler disabled, Resize PVC with FS resize",
PVC: createPVC(2, 1),
PV: createPV(1, "testPVC", defaultNS, "foobar", &fsVolumeMode),
CreateObjects: true,
NodeResize: true,
CallCSIExpand: true,
disableVolumeInUseErrorHandler: true,
},
{
Name: "With volume-in-use error handler disabled, Resize PVC, no FS resize",
PVC: createPVC(2, 1),
PV: createPV(1, "testPVC", defaultNS, "foobar", &fsVolumeMode),
CreateObjects: true,
CallCSIExpand: true,
disableVolumeInUseErrorHandler: true,
},
} {
client := csi.NewMockClient("mock", test.NodeResize, true, true)
driverName, _ := client.GetDriverName(context.TODO())
Expand Down Expand Up @@ -163,7 +223,7 @@ func TestController(t *testing.T) {
t.Fatalf("Test %s: Unable to create resizer: %v", test.Name, err)
}

controller := NewResizeController(driverName, csiResizer, kubeClient, time.Second, informerFactory, workqueue.DefaultControllerRateLimiter())
controller := NewResizeController(driverName, csiResizer, kubeClient, time.Second, informerFactory, workqueue.DefaultControllerRateLimiter(), !test.disableVolumeInUseErrorHandler)

ctrlInstance, _ := controller.(*resizeController)

Expand Down

0 comments on commit d125c02

Please sign in to comment.