Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Continue provisioning deleted volumes #29

Merged
merged 4 commits into from
Feb 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 121 additions & 46 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ const annSelectedNode = "volume.kubernetes.io/selected-node"
// Finalizer for PVs so we know to clean them up
const finalizerPV = "external-provisioner.volume.kubernetes.io/finalizer"

const uidIndex = "uid"

// ProvisionController is a controller that provisions PersistentVolumes for
// PersistentVolumeClaims.
type ProvisionController struct {
Expand All @@ -104,8 +106,8 @@ type ProvisionController struct {
// * 1.6: storage classes enter GA
kubeVersion *utilversion.Version

claimInformer cache.SharedInformer
claims cache.Store
claimInformer cache.SharedIndexInformer
claimsIndexer cache.Indexer
volumeInformer cache.SharedInformer
volumes cache.Store
classInformer cache.SharedInformer
Expand Down Expand Up @@ -153,6 +155,9 @@ type ProvisionController struct {

hasRun bool
hasRunLock *sync.Mutex

// Map UID -> *PVC with all claims that may be provisioned in the background.
claimsInProgress sync.Map
}

const (
Expand Down Expand Up @@ -376,7 +381,7 @@ func RetryPeriod(retryPeriod time.Duration) func(*ProvisionController) error {

// ClaimsInformer sets the informer to use for accessing PersistentVolumeClaims.
// Defaults to using a internal informer.
func ClaimsInformer(informer cache.SharedInformer) func(*ProvisionController) error {
func ClaimsInformer(informer cache.SharedIndexInformer) func(*ProvisionController) error {
return func(c *ProvisionController) error {
if c.HasRun() {
return errRuntime
Expand Down Expand Up @@ -550,9 +555,12 @@ func NewProvisionController(
// PersistentVolumeClaims

claimHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) },
UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.claimQueue, newObj) },
DeleteFunc: func(obj interface{}) { controller.forgetWork(controller.claimQueue, obj) },
AddFunc: func(obj interface{}) { controller.enqueueClaim(obj) },
UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueClaim(newObj) },
DeleteFunc: func(obj interface{}) {
// NOOP. The claim is either in claimsInProgress and in the queue, so it will be processed as usual
// or it's not in claimsInProgress and then we don't care
},
}

if controller.claimInformer != nil {
Expand All @@ -561,15 +569,22 @@ func NewProvisionController(
controller.claimInformer = informer.Core().V1().PersistentVolumeClaims().Informer()
controller.claimInformer.AddEventHandler(claimHandler)
}
controller.claims = controller.claimInformer.GetStore()
controller.claimInformer.AddIndexers(cache.Indexers{uidIndex: func(obj interface{}) ([]string, error) {
uid, err := getObjectUID(obj)
if err != nil {
return nil, err
}
return []string{uid}, nil
}})
controller.claimsIndexer = controller.claimInformer.GetIndexer()

// -----------------
// PersistentVolumes

volumeHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },
UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.volumeQueue, newObj) },
DeleteFunc: func(obj interface{}) { controller.forgetWork(controller.volumeQueue, obj) },
AddFunc: func(obj interface{}) { controller.enqueueVolume(obj) },
UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueVolume(newObj) },
DeleteFunc: func(obj interface{}) { controller.forgetVolume(obj) },
}

if controller.volumeInformer != nil {
Expand All @@ -596,9 +611,37 @@ func NewProvisionController(
return controller
}

// enqueueWork takes an obj and converts it into a namespace/name string which
func getObjectUID(obj interface{}) (string, error) {
var object metav1.Object
var ok bool
if object, ok = obj.(metav1.Object); !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return "", fmt.Errorf("error decoding object, invalid type")
}
object, ok = tombstone.Obj.(metav1.Object)
if !ok {
return "", fmt.Errorf("error decoding object tombstone, invalid type")
}
}
return string(object.GetUID()), nil
}

// enqueueClaim takes an obj and converts it into UID that is then put onto claim work queue.
func (ctrl *ProvisionController) enqueueClaim(obj interface{}) {
uid, err := getObjectUID(obj)
if err != nil {
utilruntime.HandleError(err)
return
}
if ctrl.claimQueue.NumRequeues(uid) == 0 {
ctrl.claimQueue.Add(uid)
}
}

// enqueueVolume takes an obj and converts it into a namespace/name string which
// is then put onto the given work queue.
func (ctrl *ProvisionController) enqueueWork(queue workqueue.RateLimitingInterface, obj interface{}) {
func (ctrl *ProvisionController) enqueueVolume(obj interface{}) {
var key string
var err error
if key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj); err != nil {
Expand All @@ -607,22 +650,22 @@ func (ctrl *ProvisionController) enqueueWork(queue workqueue.RateLimitingInterfa
}
// Re-Adding is harmless but try to add it to the queue only if it is not
// already there, because if it is already there we *must* be retrying it
if queue.NumRequeues(key) == 0 {
queue.Add(key)
if ctrl.volumeQueue.NumRequeues(key) == 0 {
ctrl.volumeQueue.Add(key)
}
}

// forgetWork Forgets an obj from the given work queue, telling the queue to
// forgetVolume Forgets an obj from the given work queue, telling the queue to
// stop tracking its retries because e.g. the obj was deleted
func (ctrl *ProvisionController) forgetWork(queue workqueue.RateLimitingInterface, obj interface{}) {
func (ctrl *ProvisionController) forgetVolume(obj interface{}) {
var key string
var err error
if key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
}
queue.Forget(key)
queue.Done(key)
ctrl.volumeQueue.Forget(key)
ctrl.volumeQueue.Done(key)
}

// Run starts all of this controller's control loops
Expand Down Expand Up @@ -746,7 +789,7 @@ func (ctrl *ProvisionController) processNextClaimWorkItem() bool {
return fmt.Errorf("expected string in workqueue but got %#v", obj)
}

if err := ctrl.syncClaimHandler(key); err != nil {
if _, err := ctrl.syncClaimHandler(key); err != nil {
if ctrl.failedProvisionThreshold == 0 {
glog.Warningf("Retrying syncing claim %q, failure %v", key, ctrl.claimQueue.NumRequeues(obj))
ctrl.claimQueue.AddRateLimited(obj)
Expand All @@ -755,13 +798,17 @@ func (ctrl *ProvisionController) processNextClaimWorkItem() bool {
ctrl.claimQueue.AddRateLimited(obj)
} else {
glog.Errorf("Giving up syncing claim %q because failures %v >= threshold %v", key, ctrl.claimQueue.NumRequeues(obj), ctrl.failedProvisionThreshold)
glog.V(2).Infof("Removing PVC %s from claims in progress", key)
ctrl.claimsInProgress.Delete(key) // This can leak a volume that's being provisioned in the background!
// Done but do not Forget: it will not be in the queue but NumRequeues
// will be saved until the obj is deleted from kubernetes
}
return fmt.Errorf("error syncing claim %q: %s", key, err.Error())
}

ctrl.claimQueue.Forget(obj)
glog.V(2).Infof("Provisioning succeeded, removing PVC %s from claims in progress", key)
ctrl.claimsInProgress.Delete(key)
return nil
}(obj)

Expand Down Expand Up @@ -818,17 +865,40 @@ func (ctrl *ProvisionController) processNextVolumeWorkItem() bool {
}

// syncClaimHandler gets the claim from informer's cache then calls syncClaim
func (ctrl *ProvisionController) syncClaimHandler(key string) error {
claimObj, exists, err := ctrl.claims.GetByKey(key)
func (ctrl *ProvisionController) syncClaimHandler(key string) (ProvisioningState, error) {
objs, err := ctrl.claimsIndexer.ByIndex(uidIndex, key)
if err != nil {
return err
return ProvisioningFinished, err
}
if !exists {
utilruntime.HandleError(fmt.Errorf("claim %q in work queue no longer exists", key))
return nil
var claimObj interface{}
if len(objs) > 0 {
claimObj = objs[0]
} else {
obj, found := ctrl.claimsInProgress.Load(key)
if !found {
utilruntime.HandleError(fmt.Errorf("claim %q in work queue no longer exists", key))
return ProvisioningFinished, nil
}
claimObj = obj
}
status, err := ctrl.syncClaim(claimObj)
if err == nil || status == ProvisioningFinished {
// Provisioning is 100% finished / not in progress.
glog.V(2).Infof("Final error received, removing PVC %s from claims in progress", key)
ctrl.claimsInProgress.Delete(key)
return status, err
}
if status == ProvisioningInBackground {
// Provisioning is in progress in background.
glog.V(2).Infof("Temporary error received, adding PVC %s to claims in progress", key)
ctrl.claimsInProgress.Store(key, claimObj)
} else {
// status == ProvisioningNoChange.
// Don't change claimsInProgress:
// - the claim is already there if previous status was ProvisioningInBackground.
// - the claim is not there if if previous status was ProvisioningFinished.
}

return ctrl.syncClaim(claimObj)
return status, err
}

// syncVolumeHandler gets the volume from informer's cache then calls syncVolume
Expand All @@ -847,19 +917,19 @@ func (ctrl *ProvisionController) syncVolumeHandler(key string) error {

// syncClaim checks if the claim should have a volume provisioned for it and
// provisions one if so.
func (ctrl *ProvisionController) syncClaim(obj interface{}) error {
func (ctrl *ProvisionController) syncClaim(obj interface{}) (ProvisioningState, error) {
claim, ok := obj.(*v1.PersistentVolumeClaim)
if !ok {
return fmt.Errorf("expected claim but got %+v", obj)
return ProvisioningFinished, fmt.Errorf("expected claim but got %+v", obj)
}

if ctrl.shouldProvision(claim) {
startTime := time.Now()
err := ctrl.provisionClaimOperation(claim)
status, err := ctrl.provisionClaimOperation(claim)
ctrl.updateProvisionStats(claim, err, startTime)
return err
return status, err
}
return nil
return ProvisioningFinished, nil
}

// syncVolume checks if the volume should be deleted and deletes if so
Expand Down Expand Up @@ -1006,7 +1076,7 @@ func (ctrl *ProvisionController) updateDeleteStats(volume *v1.PersistentVolume,
// provisionClaimOperation attempts to provision a volume for the given claim.
// Returns error, which indicates whether provisioning should be retried
// (requeue the claim) or not
func (ctrl *ProvisionController) provisionClaimOperation(claim *v1.PersistentVolumeClaim) error {
func (ctrl *ProvisionController) provisionClaimOperation(claim *v1.PersistentVolumeClaim) (ProvisioningState, error) {
// Most code here is identical to that found in controller.go of kube's PV controller...
claimClass := util.GetPersistentVolumeClaimClass(claim)
operation := fmt.Sprintf("provision %q class %q", claimToClaimKey(claim), claimClass)
Expand All @@ -1020,48 +1090,48 @@ func (ctrl *ProvisionController) provisionClaimOperation(claim *v1.PersistentVol
if err == nil && volume != nil {
// Volume has been already provisioned, nothing to do.
glog.Info(logOperation(operation, "persistentvolume %q already exists, skipping", pvName))
return nil
return ProvisioningFinished, nil
}

// Prepare a claimRef to the claim early (to fail before a volume is
// provisioned)
claimRef, err := ref.GetReference(scheme.Scheme, claim)
if err != nil {
glog.Error(logOperation(operation, "unexpected error getting claim reference: %v", err))
return nil
return ProvisioningNoChange, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

y NoChange here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I don't know what previous ProvisionExt() returned. I know it's very unlikely that GetReference succeeded once, ProvisionExt() returned Background and second GetReference fails, but I prefer to be on the safer side.

}

provisioner, parameters, err := ctrl.getStorageClassFields(claimClass)
if err != nil {
glog.Error(logOperation(operation, "error getting claim's StorageClass's fields: %v", err))
return nil
return ProvisioningFinished, nil
}
if provisioner != ctrl.provisionerName {
// class.Provisioner has either changed since shouldProvision() or
// annDynamicallyProvisioned contains different provisioner than
// class.Provisioner.
glog.Error(logOperation(operation, "unknown provisioner %q requested in claim's StorageClass", provisioner))
return nil
return ProvisioningFinished, nil
}

// Check if this provisioner can provision this claim.
if err = ctrl.canProvision(claim); err != nil {
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", err.Error())
glog.Error(logOperation(operation, "failed to provision volume: %v", err))
return nil
return ProvisioningFinished, nil
}

reclaimPolicy := v1.PersistentVolumeReclaimDelete
if ctrl.kubeVersion.AtLeast(utilversion.MustParseSemantic("v1.8.0")) {
reclaimPolicy, err = ctrl.fetchReclaimPolicy(claimClass)
if err != nil {
return err
return ProvisioningFinished, err
}
}

mountOptions, err := ctrl.fetchMountOptions(claimClass)
if err != nil {
return err
return ProvisioningFinished, err
}

var selectedNode *v1.Node
Expand All @@ -1073,7 +1143,7 @@ func (ctrl *ProvisionController) provisionClaimOperation(claim *v1.PersistentVol
if err != nil {
err = fmt.Errorf("failed to get target node: %v", err)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", err.Error())
return err
return ProvisioningNoChange, err
}
}

Expand All @@ -1082,7 +1152,7 @@ func (ctrl *ProvisionController) provisionClaimOperation(claim *v1.PersistentVol
if err != nil {
err = fmt.Errorf("failed to get AllowedTopologies from StorageClass: %v", err)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", err.Error())
return err
return ProvisioningNoChange, err
}
}

Expand All @@ -1098,16 +1168,21 @@ func (ctrl *ProvisionController) provisionClaimOperation(claim *v1.PersistentVol

ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, "Provisioning", fmt.Sprintf("External provisioner is provisioning volume for claim %q", claimToClaimKey(claim)))

volume, err = ctrl.provisioner.Provision(options)
result := ProvisioningFinished
if p, ok := ctrl.provisioner.(ProvisionerExt); ok {
volume, result, err = p.ProvisionExt(options)
} else {
volume, err = ctrl.provisioner.Provision(options)
}
if err != nil {
if ierr, ok := err.(*IgnoredError); ok {
// Provision ignored, do nothing and hope another provisioner will provision it.
glog.Info(logOperation(operation, "volume provision ignored: %v", ierr))
return nil
return ProvisioningFinished, nil
}
err = fmt.Errorf("failed to provision volume with StorageClass %q: %v", claimClass, err)
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", err.Error())
return err
return result, err
}

glog.Info(logOperation(operation, "volume %q provisioned", volume.Name))
Expand Down Expand Up @@ -1180,7 +1255,7 @@ func (ctrl *ProvisionController) provisionClaimOperation(claim *v1.PersistentVol
}

glog.Info(logOperation(operation, "succeeded"))
return nil
return ProvisioningFinished, nil
}

// deleteVolumeOperation attempts to delete the volume backing the given
Expand Down
Loading