Skip to content

Commit

Permalink
fix: replicas aren't getting cleaned up during driver uninstall
Browse files Browse the repository at this point in the history
  • Loading branch information
sunpa93 committed Nov 9, 2022
1 parent 4c14adb commit ba91e4f
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 118 deletions.
2 changes: 1 addition & 1 deletion pkg/controller/azdrivernode.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (r *ReconcileAzDriverNode) Reconcile(ctx context.Context, request reconcile
}

// Delete all volumeAttachments attached to this node, if failed, requeue
if _, err = r.cleanUpAzVolumeAttachmentByNode(ctx, request.Name, azdrivernode, azureutils.AllRoles, detachAndDeleteCRI); err != nil {
if _, err = r.cleanUpAzVolumeAttachmentByNode(ctx, request.Name, azdrivernode, azureutils.AllRoles, cleanUpAttachment); err != nil {
return reconcile.Result{Requeue: true}, err
}
return reconcile.Result{}, nil
Expand Down
197 changes: 89 additions & 108 deletions pkg/controller/azvolume.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,8 @@ func (r *ReconcileAzVolume) triggerCreate(ctx context.Context, azVolume *azdiskv
if createErr != nil {
updateFunc = func(obj client.Object) error {
azv := obj.(*azdiskv1beta2.AzVolume)
azv = r.updateError(azv, createErr)
azv = r.deleteFinalizer(azv, map[string]bool{consts.AzVolumeFinalizer: true})
_, derr := r.updateState(azv, azdiskv1beta2.VolumeCreationFailed, forceUpdate)
_, derr := r.reportError(azv, azdiskv1beta2.VolumeCreationFailed, createErr)
return derr
}
updateMode = azureutils.UpdateAll
Expand Down Expand Up @@ -211,15 +210,6 @@ func (r *ReconcileAzVolume) triggerDelete(ctx context.Context, azVolume *azdiskv
ctx, w := workflow.New(ctx)
defer func() { w.Finish(err) }()

// Determine if this is a controller server requested deletion or driver clean up
volumeDeleteRequested := volumeDeleteRequested(azVolume)
preProvisionCleanupRequested := isPreProvisionCleanupRequested(azVolume)

mode := deleteCRIOnly
if volumeDeleteRequested || preProvisionCleanupRequested {
mode = detachAndDeleteCRI
}

// override volume operation queue to prevent any other replica operation from being executed
release := r.closeOperationQueue(azVolume.Name)
defer func() {
Expand All @@ -228,9 +218,14 @@ func (r *ReconcileAzVolume) triggerDelete(ctx context.Context, azVolume *azdiskv
}
}()

// only try deleting underlying volume 1) if volume creation was successful and 2) volumeDeleteRequestAnnotation is present
// if the annotation is not present, only delete the CRI and not the underlying volume
if isCreated(azVolume) && mode == detachAndDeleteCRI {
// Determine if this is a controller server requested deletion or driver clean up
volumeDeleteRequested := volumeDeleteRequested(azVolume)
preProvisionCleanupRequested := isPreProvisionCleanupRequested(azVolume)

mode := cleanUpAttachmentForUninstall
if volumeDeleteRequested || preProvisionCleanupRequested {
// primary attachments should be detached only if volume is being deleted or pv was deleted.
mode = cleanUpAttachment
// requeue if AzVolume's state is being updated by a different worker
defer r.stateLock.Delete(azVolume.Name)
if _, ok := r.stateLock.LoadOrStore(azVolume.Name, nil); ok {
Expand All @@ -250,68 +245,59 @@ func (r *ReconcileAzVolume) triggerDelete(ctx context.Context, azVolume *azdiskv
if volumeDeleteRequested {
w.Logger().V(5).Info("Deleting Volume...")
}
}

waitCh := make(chan goSignal)
//nolint:contextcheck // call is asynchronous; context is not inherited by design
go func() {
_, goWorkflow := workflow.New(ctx)
var deleteErr error
defer func() { goWorkflow.Finish(deleteErr) }()
waitCh <- goSignal{}

goCtx := goWorkflow.SaveToContext(context.Background())

deleteCtx, deleteCancel := context.WithTimeout(goCtx, cloudTimeout)
defer deleteCancel()
waitCh := make(chan goSignal)
//nolint:contextcheck // call is asynchronous; context is not inherited by design
go func() {
_, goWorkflow := workflow.New(ctx)

reportError := func(obj interface{}, err error) error {
azv := obj.(*azdiskv1beta2.AzVolume)
_ = r.updateError(azv, err)
_, derr := r.updateState(azv, azdiskv1beta2.VolumeDeletionFailed, forceUpdate)
return derr
}
var deleteErr error
defer func() { goWorkflow.Finish(deleteErr) }()
waitCh <- goSignal{}

var updateFunc azureutils.UpdateCRIFunc
var err error
updateMode := azureutils.UpdateCRIStatus
goCtx := goWorkflow.SaveToContext(context.Background())

// Delete all AzVolumeAttachment objects bound to the deleted AzVolume
var attachments []azdiskv1beta2.AzVolumeAttachment
attachments, err = r.cleanUpAzVolumeAttachmentByVolume(deleteCtx, azVolume.Name, azvolume, azureutils.AllRoles, mode)
if err != nil {
updateFunc = func(obj client.Object) error {
return reportError(obj, err)
var updateFunc azureutils.UpdateCRIFunc
var err error
updateMode := azureutils.UpdateCRI

deleteCtx, deleteCancel := context.WithTimeout(goCtx, cloudTimeout)
defer deleteCancel()

// Delete all AzVolumeAttachment objects bound to the deleted AzVolume
var attachments []azdiskv1beta2.AzVolumeAttachment
attachments, err = r.cleanUpAzVolumeAttachmentByVolume(deleteCtx, azVolume.Name, azvolume, azureutils.AllRoles, mode)
if err == nil {
var wg sync.WaitGroup
errors := make([]error, len(attachments))
numErrors := uint32(0)

// start waiting for replica AzVolumeAttachment CRIs to be deleted
for i, attachment := range attachments {
var waiter *watcher.ConditionWaiter
waiter, err = r.conditionWatcher.NewConditionWaiter(deleteCtx, watcher.AzVolumeAttachmentType, attachment.Name, verifyObjectFailedOrDeleted)
if err != nil {
break
}
} else {
var wg sync.WaitGroup
errors := make([]error, len(attachments))
numErrors := uint32(0)

// start waiting for replica AzVolumeAttachment CRIs to be deleted
for i, attachment := range attachments {
waiter, err := r.conditionWatcher.NewConditionWaiter(deleteCtx, watcher.AzVolumeAttachmentType, attachment.Name, verifyObjectFailedOrDeleted)
// wait async and report error to go channel
wg.Add(1)
go func(ctx context.Context, waiter *watcher.ConditionWaiter, i int) {
defer waiter.Close()
defer wg.Done()
_, err := waiter.Wait(ctx)
if err != nil {
updateFunc = func(obj client.Object) error {
return reportError(obj, err)
}
break
errors[i] = err
atomic.AddUint32(&numErrors, 1)
}
}(deleteCtx, waiter, i)
}

// wait async and report error to go channel
wg.Add(1)
go func(ctx context.Context, waiter *watcher.ConditionWaiter, i int) {
defer waiter.Close()
defer wg.Done()
_, err := waiter.Wait(ctx)
if err != nil {
errors[i] = err
atomic.AddUint32(&numErrors, 1)
}
}(deleteCtx, waiter, i)
}

wg.Wait()
// start waiting for the attachment clean up to complete (outside of the (if err == nil) to avoid child workflow finishing after parent workflow finishing)
wg.Wait()

if err == nil {
// if errors have been found with the wait calls, format the error msg and report via CRI
if numErrors > 0 {
var errMsgs []string
Expand All @@ -321,50 +307,40 @@ func (r *ReconcileAzVolume) triggerDelete(ctx context.Context, azVolume *azdiskv
}
}
err = status.Errorf(codes.Internal, strings.Join(errMsgs, ", "))
updateFunc = func(obj client.Object) error {
return reportError(obj, err)
}
}
}
}

if err == nil {
if volumeDeleteRequested {
cloudCtx, cloudCancel := context.WithTimeout(goCtx, cloudTimeout)
defer cloudCancel()
// if azVolumeAttachment clean up succeeded and volume needs to be deleted
if err == nil && volumeDeleteRequested {
cloudCtx, cloudCancel := context.WithTimeout(goCtx, cloudTimeout)
defer cloudCancel()

deleteErr = r.deleteVolume(cloudCtx, azVolume)
}
if deleteErr != nil {
updateFunc = func(obj client.Object) error {
azv := obj.(*azdiskv1beta2.AzVolume)
azv = r.updateError(azv, deleteErr)
_, derr := r.updateState(azv, azdiskv1beta2.VolumeDeletionFailed, forceUpdate)
return derr
}
} else {
updateMode = azureutils.UpdateAll
updateFunc = func(obj client.Object) error {
azv := obj.(*azdiskv1beta2.AzVolume)
_ = r.deleteFinalizer(azv, map[string]bool{consts.AzVolumeFinalizer: true})
return nil
}
}
}

//nolint:contextcheck // final status update of the CRI must occur even when the current context's deadline passes.
_, _ = azureutils.UpdateCRIWithRetry(goCtx, nil, r.cachedClient, r.azClient, azVolume, updateFunc, consts.ForcedUpdateMaxNetRetry, updateMode)
}()
<-waitCh
} else {
updateFunc := func(obj client.Object) error {
azv := obj.(*azdiskv1beta2.AzVolume)
_ = r.deleteFinalizer(azv, map[string]bool{consts.AzVolumeFinalizer: true})
return nil
err = r.deleteVolume(cloudCtx, azVolume)
}
if _, err = azureutils.UpdateCRIWithRetry(ctx, nil, r.cachedClient, r.azClient, azVolume, updateFunc, consts.NormalUpdateMaxNetRetry, azureutils.UpdateCRI); err != nil {
return err

// if any operation was unsuccessful, report the error
if err != nil {
updateMode = azureutils.UpdateCRIStatus
updateFunc = func(obj client.Object) error {
azV := obj.(*azdiskv1beta2.AzVolume)
_, derr := r.reportError(azV, azdiskv1beta2.VolumeDeletionFailed, err)
return derr
}
} else {
// if every operation was successful, delete the finalizer
updateFunc = func(obj client.Object) error {
azv := obj.(*azdiskv1beta2.AzVolume)
_ = r.deleteFinalizer(azv, map[string]bool{consts.AzVolumeFinalizer: true})
return nil
}
}
}

//nolint:contextcheck // final status update of the CRI must occur even when the current context's deadline passes.
_, _ = azureutils.UpdateCRIWithRetry(goCtx, nil, r.cachedClient, r.azClient, azVolume, updateFunc, consts.ForcedUpdateMaxNetRetry, updateMode)

}()
<-waitCh
return nil
}

Expand Down Expand Up @@ -409,8 +385,7 @@ func (r *ReconcileAzVolume) triggerUpdate(ctx context.Context, azVolume *azdiskv
if updateErr != nil {
updateFunc = func(obj client.Object) error {
azv := obj.(*azdiskv1beta2.AzVolume)
azv = r.updateError(azv, updateErr)
_, derr := r.updateState(azv, azdiskv1beta2.VolumeUpdateFailed, forceUpdate)
_, derr := r.reportError(azv, azdiskv1beta2.VolumeUpdateFailed, updateErr)
return derr
}
} else {
Expand Down Expand Up @@ -453,6 +428,14 @@ func (r *ReconcileAzVolume) deleteFinalizer(azVolume *azdiskv1beta2.AzVolume, fi
return azVolume
}

func (r *ReconcileAzVolume) reportError(azVolume *azdiskv1beta2.AzVolume, state azdiskv1beta2.AzVolumeState, err error) (*azdiskv1beta2.AzVolume, error) {
if azVolume == nil {
return nil, status.Errorf(codes.FailedPrecondition, "function `reportError` requires non-nil AzVolume object.")
}
azVolume = r.updateError(azVolume, err)
return r.updateState(azVolume, state, forceUpdate)
}

func (r *ReconcileAzVolume) updateState(azVolume *azdiskv1beta2.AzVolume, state azdiskv1beta2.AzVolumeState, mode updateMode) (*azdiskv1beta2.AzVolume, error) {
var err error
if azVolume == nil {
Expand Down Expand Up @@ -486,9 +469,7 @@ func (r *ReconcileAzVolume) updateError(azVolume *azdiskv1beta2.AzVolume, err er
if azVolume == nil {
return nil
}

azVolume.Status.Error = util.NewAzError(err)

return azVolume
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,11 @@ const (
pod operationRequester = "pod-controller"
)

type cleanUpMode int
type attachmentCleanUpMode int

const (
deleteCRIOnly cleanUpMode = iota
detachAndDeleteCRI
cleanUpAttachmentForUninstall attachmentCleanUpMode = iota
cleanUpAttachment
)

type updateMode int
Expand Down
12 changes: 6 additions & 6 deletions pkg/controller/shared_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -931,7 +931,7 @@ func (c *SharedState) createReplicaAzVolumeAttachment(ctx context.Context, volum
return nil
}

func (c *SharedState) cleanUpAzVolumeAttachmentByVolume(ctx context.Context, azVolumeName string, caller operationRequester, role azureutils.AttachmentRoleMode, deleteMode cleanUpMode) ([]azdiskv1beta2.AzVolumeAttachment, error) {
func (c *SharedState) cleanUpAzVolumeAttachmentByVolume(ctx context.Context, azVolumeName string, caller operationRequester, role azureutils.AttachmentRoleMode, deleteMode attachmentCleanUpMode) ([]azdiskv1beta2.AzVolumeAttachment, error) {
var err error
ctx, w := workflow.New(ctx, workflow.WithDetails(consts.VolumeNameLabel, azVolumeName))
defer func() { w.Finish(err) }()
Expand All @@ -956,7 +956,7 @@ func (c *SharedState) cleanUpAzVolumeAttachmentByVolume(ctx context.Context, azV
return attachments, nil
}

func (c *SharedState) cleanUpAzVolumeAttachmentByNode(ctx context.Context, azDriverNodeName string, caller operationRequester, role azureutils.AttachmentRoleMode, deleteMode cleanUpMode) ([]azdiskv1beta2.AzVolumeAttachment, error) {
func (c *SharedState) cleanUpAzVolumeAttachmentByNode(ctx context.Context, azDriverNodeName string, caller operationRequester, role azureutils.AttachmentRoleMode, deleteMode attachmentCleanUpMode) ([]azdiskv1beta2.AzVolumeAttachment, error) {
var err error
ctx, w := workflow.New(ctx, workflow.WithDetails(consts.NodeNameLabel, azDriverNodeName))
defer func() { w.Finish(err) }()
Expand Down Expand Up @@ -1001,7 +1001,7 @@ func (c *SharedState) cleanUpAzVolumeAttachmentByNode(ctx context.Context, azDri
return attachments.Items, nil
}

func (c *SharedState) cleanUpAzVolumeAttachments(ctx context.Context, attachments []azdiskv1beta2.AzVolumeAttachment, cleanUp cleanUpMode, caller operationRequester) error {
func (c *SharedState) cleanUpAzVolumeAttachments(ctx context.Context, attachments []azdiskv1beta2.AzVolumeAttachment, cleanUp attachmentCleanUpMode, caller operationRequester) error {
var err error

for _, attachment := range attachments {
Expand All @@ -1011,12 +1011,12 @@ func (c *SharedState) cleanUpAzVolumeAttachments(ctx context.Context, attachment
// if caller is azdrivernode, don't append cleanup annotation
if (caller != azdrivernode && !metav1.HasAnnotation(patched.ObjectMeta, consts.CleanUpAnnotation)) ||
// replica attachments should always be detached regardless of the cleanup mode
((cleanUp == detachAndDeleteCRI || patched.Spec.RequestedRole == azdiskv1beta2.ReplicaRole) && !metav1.HasAnnotation(patched.ObjectMeta, consts.VolumeDetachRequestAnnotation)) {
((cleanUp == cleanUpAttachment || patched.Spec.RequestedRole == azdiskv1beta2.ReplicaRole) && !metav1.HasAnnotation(patched.ObjectMeta, consts.VolumeDetachRequestAnnotation)) {
patchRequired = true
if caller != azdrivernode {
patched.Status.Annotations = azureutils.AddToMap(patched.Status.Annotations, consts.CleanUpAnnotation, string(caller))
}
if cleanUp == detachAndDeleteCRI || patched.Spec.RequestedRole == azdiskv1beta2.ReplicaRole {
if cleanUp == cleanUpAttachment || patched.Spec.RequestedRole == azdiskv1beta2.ReplicaRole {
patched.Status.Annotations = azureutils.AddToMap(patched.Status.Annotations, consts.VolumeDetachRequestAnnotation, string(caller))
}
}
Expand Down Expand Up @@ -1069,7 +1069,7 @@ func (c *SharedState) garbageCollectReplicas(ctx context.Context, volumeName str
volumeName,
replica,
func(ctx context.Context) error {
if _, err := c.cleanUpAzVolumeAttachmentByVolume(ctx, volumeName, requester, azureutils.ReplicaOnly, detachAndDeleteCRI); err != nil {
if _, err := c.cleanUpAzVolumeAttachmentByVolume(ctx, volumeName, requester, azureutils.ReplicaOnly, cleanUpAttachment); err != nil {
return err
}
c.addToGcExclusionList(volumeName, replica)
Expand Down

0 comments on commit ba91e4f

Please sign in to comment.