Skip to content

Commit

Permalink
fix: add nodeDiskAvailabilityMap
Browse files Browse the repository at this point in the history
  • Loading branch information
alice-zheyan-yu committed Nov 15, 2022
1 parent 4bb1bb4 commit 395e233
Show file tree
Hide file tree
Showing 10 changed files with 477 additions and 15 deletions.
4 changes: 4 additions & 0 deletions pkg/controller/attach_detach.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ func (r *ReconcileAttachDetach) Reconcile(ctx context.Context, request reconcile
if err := r.removeFinalizer(ctx, azVolumeAttachment); err != nil {
return reconcileReturnOnError(ctx, azVolumeAttachment, "delete", err, r.retryInfo)
}
// deletion of azVolumeAttachment is succeeded, the node's remaining capacity of disk attachment should be increased by 1
r.incrementAttachmentCount(ctx, azVolumeAttachment.Spec.NodeName)
// detachment request
} else if volumeDetachRequested(azVolumeAttachment) {
if err := r.triggerDetach(ctx, azVolumeAttachment); err != nil {
Expand Down Expand Up @@ -282,6 +284,8 @@ func (r *ReconcileAttachDetach) triggerAttach(ctx context.Context, azVolumeAttac
r.eventRecorder.Eventf(pod.DeepCopyObject(), v1.EventTypeNormal, consts.ReplicaAttachmentSuccessEvent, "Replica mount for volume %s successfully attached to node %s", azVolumeAttachment.Spec.VolumeName, azVolumeAttachment.Spec.NodeName)
}
}
// the node's remaining capacity of disk attachment should be decreased by 1, since the disk attachment is succeeded.
r.decrementAttachmentCount(ctx, azVolumeAttachment.Spec.NodeName)
}

updateFunc := func(obj client.Object) error {
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/attach_detach_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func TestAttachDetachReconcile(t *testing.T) {
newAttachment)

controller.azVolumeAttachmentToVaMap.Store(newAttachment.Name, newVolumeAttachment.Name)
addTestNodeInAvailableAttachmentsMap(*controller.SharedState, newAttachment.Spec.NodeName)

mockClientsAndAttachmentProvisioner(controller)

Expand Down Expand Up @@ -137,6 +138,7 @@ func TestAttachDetachReconcile(t *testing.T) {
newVolumeAttachment,
newAttachment)

addTestNodeInAvailableAttachmentsMap(*controller.SharedState, newAttachment.Spec.NodeName)
mockClientsAndAttachmentProvisioner(controller)

return controller
Expand Down Expand Up @@ -168,6 +170,7 @@ func TestAttachDetachReconcile(t *testing.T) {
&testAzVolume0,
newAttachment)

addTestNodeInAvailableAttachmentsMap(*controller.SharedState, newAttachment.Spec.NodeName)
mockClientsAndAttachmentProvisioner(controller)

return controller
Expand Down
10 changes: 8 additions & 2 deletions pkg/controller/azdrivernode.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,12 @@ func (r *ReconcileAzDriverNode) Reconcile(ctx context.Context, request reconcile
n := &corev1.Node{}
err := r.cachedClient.Get(ctx, request.NamespacedName, n)

// If the node still exists don't delete the AzDriverNode
if err == nil {
if n.ObjectMeta.DeletionTimestamp == nil {
// for create event, add the new node in availableAttachmentsMap
r.addNodeToAvailableAttachmentsMap(ctx, request.Name)
}
// for delete even, if the node still exists don't delete the AzDriverNode
return reconcile.Result{}, nil
}

Expand All @@ -70,6 +74,7 @@ func (r *ReconcileAzDriverNode) Reconcile(ctx context.Context, request reconcile
if err != nil && !errors.IsNotFound(err) {
return reconcile.Result{Requeue: true}, err
}
r.deleteNodeFromAvailableAttachmentsMap(ctx, request.Name)

// Delete all volumeAttachments attached to this node, if failed, requeue
if _, err = r.cleanUpAzVolumeAttachmentByNode(ctx, request.Name, azdrivernode, azureutils.AllRoles, cleanUpAttachment); err != nil {
Expand Down Expand Up @@ -101,6 +106,7 @@ func (r *ReconcileAzDriverNode) Recover(ctx context.Context) error {
if _, err = r.azClient.DiskV1beta2().AzDriverNodes(r.objectNamespace).Update(ctx, updated, metav1.UpdateOptions{}); err != nil {
return err
}
r.addNodeToAvailableAttachmentsMap(ctx, node.Name)
}
return nil
}
Expand All @@ -127,7 +133,7 @@ func NewAzDriverNodeController(mgr manager.Manager, controllerSharedState *Share
// Predicate to only reconcile deleted nodes
p := predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
return false
return true
},
UpdateFunc: func(e event.UpdateEvent) bool {
return false
Expand Down
56 changes: 51 additions & 5 deletions pkg/controller/azdrivernode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2/klogr"
azdiskfakes "sigs.k8s.io/azuredisk-csi-driver/pkg/apis/client/clientset/versioned/fake"
Expand Down Expand Up @@ -131,9 +132,12 @@ func TestAzDriverNodeControllerReconcile(t *testing.T) {
&testAzDriverNode1)

controller.cachedClient.(*mockclient.MockClient).EXPECT().
Get(gomock.Any(), testNode1Request.NamespacedName, gomock.Any()).
Return(nil).
AnyTimes()
Get(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil).AnyTimes()

controller.cachedClient.(*mockclient.MockClient).EXPECT().
List(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil).AnyTimes()

return controller
},
Expand Down Expand Up @@ -170,6 +174,34 @@ func TestAzDriverNodeControllerReconcile(t *testing.T) {
require.NoError(t, err2)
},
},
{
description: "[Success] Should add new AzDriverNode in availableAttachmentsMap",
request: testNode1Request,
setupFunc: func(t *testing.T, mockCtl *gomock.Controller) *ReconcileAzDriverNode {
controller := NewTestAzDriverNodeController(
mockCtl,
testNamespace,
&testAzDriverNode0)

controller.cachedClient.(*mockclient.MockClient).EXPECT().
Get(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil).
AnyTimes()

mockClients(controller.cachedClient.(*mockclient.MockClient), controller.azClient, nil)

return controller
},
verifyFunc: func(t *testing.T, controller *ReconcileAzDriverNode, result reconcile.Result, err error) {
require.NoError(t, err)
require.False(t, result.Requeue)

_, err2 := controller.azClient.DiskV1beta2().AzDriverNodes(testNamespace).Get(context.TODO(), testNode0Name, metav1.GetOptions{})
require.NoError(t, err2)
_, nodeExists := controller.availableAttachmentsMap.Load(testNode1Name)
require.True(t, nodeExists)
},
},
}

for _, test := range tests {
Expand Down Expand Up @@ -200,8 +232,18 @@ func TestAzDriverNodeRecover(t *testing.T) {
&testAzDriverNode1)

controller.cachedClient.(*mockclient.MockClient).EXPECT().
Get(gomock.Any(), testNode1Request.NamespacedName, gomock.Any()).
Return(testNode1ServerTimeoutError).
Get(gomock.Any(), types.NamespacedName{Name: testAzDriverNode0.Name}, gomock.Any()).
Return(nil).
AnyTimes()

controller.cachedClient.(*mockclient.MockClient).EXPECT().
Get(gomock.Any(), types.NamespacedName{Name: testAzDriverNode1.Name}, gomock.Any()).
Return(nil).
AnyTimes()

controller.cachedClient.(*mockclient.MockClient).EXPECT().
List(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil).
AnyTimes()

return controller
Expand All @@ -225,6 +267,10 @@ func TestAzDriverNodeRecover(t *testing.T) {
defer mockCtl.Finish()
controller := tt.setupFunc(t, mockCtl)
err := controller.Recover(context.TODO())
_, node0Exists := controller.availableAttachmentsMap.Load(testAzDriverNode0.Name)
require.True(t, node0Exists)
_, node1Exists := controller.availableAttachmentsMap.Load(testAzDriverNode1.Name)
require.True(t, node1Exists)
tt.verifyFunc(t, controller, err)
})
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/controller/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"reflect"
"strings"
"sync/atomic"
"time"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -751,3 +752,9 @@ func mockClients(mockClient *mockclient.MockClient, azVolumeClient azdisk.Interf
}).
AnyTimes()
}

func addTestNodeInAvailableAttachmentsMap(sharedState SharedState, nodeName string) {
var count atomic.Int32
count.Store(int32(8))
sharedState.availableAttachmentsMap.Store(nodeName, &count)
}
5 changes: 5 additions & 0 deletions pkg/controller/node_availability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controller

import (
"context"
"sync/atomic"
"testing"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -81,6 +82,10 @@ func TestNodeAvailabilityController(t *testing.T) {
newPod,
)

var count atomic.Int32
count.Store(int32(8))
controller.availableAttachmentsMap.Store(testSchedulableNode1.Name, &count)

mockClients(controller.cachedClient.(*mockclient.MockClient), controller.azClient, controller.kubeClient)
controller.priorityReplicaRequestsQueue.Push(context.TODO(), &ReplicaRequest{VolumeName: testPersistentVolume0Name, Priority: 1})

Expand Down
46 changes: 46 additions & 0 deletions pkg/controller/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controller

import (
"context"
"sync/atomic"
"testing"

"github.com/Azure/go-autorest/autorest/to"
Expand Down Expand Up @@ -79,6 +80,8 @@ func TestPodReconcile(t *testing.T) {
&testNode0,
)

addTestNodeInAvailableAttachmentsMap(*controller.SharedState, testNode0.Name)

mockClients(controller.cachedClient.(*mockclient.MockClient), controller.azClient, controller.kubeClient)
return controller
},
Expand Down Expand Up @@ -123,6 +126,14 @@ func TestPodReconcile(t *testing.T) {
&testPersistentVolume0,
newPod)

// addTestNodeInAvailableAttachmentsMap(*controller.SharedState, testNode0.Name)
// addTestNodeInAvailableAttachmentsMap(*controller.SharedState, testNode1.Name)
var count atomic.Int32
count.Store(int32(8))
controller.availableAttachmentsMap.Store(testNode0.Name, &count)
var count1 atomic.Int32
count1.Store(int32(8))
controller.availableAttachmentsMap.Store(testNode1.Name, &count1)
mockClients(controller.cachedClient.(*mockclient.MockClient), controller.azClient, controller.kubeClient)
return controller
},
Expand Down Expand Up @@ -177,6 +188,18 @@ func TestPodReconcile(t *testing.T) {
&testNode1,
&testNode2,
newPod)
var count atomic.Int32
count.Store(int32(8))
controller.availableAttachmentsMap.Store(testNode0.Name, &count)
var count1 atomic.Int32
count1.Store(int32(8))
controller.availableAttachmentsMap.Store(testNode1.Name, &count1)
var count2 atomic.Int32
count2.Store(int32(8))
controller.availableAttachmentsMap.Store(testNode2.Name, &count2)
// addTestNodeInAvailableAttachmentsMap(*controller.SharedState, testNode0.Name)
// addTestNodeInAvailableAttachmentsMap(*controller.SharedState, testNode1.Name)
// addTestNodeInAvailableAttachmentsMap(*controller.SharedState, testNode2.Name)

mockClients(controller.cachedClient.(*mockclient.MockClient), controller.azClient, controller.kubeClient)
return controller
Expand Down Expand Up @@ -240,6 +263,18 @@ func TestPodReconcile(t *testing.T) {
newPod0,
newPod1)

// addTestNodeInAvailableAttachmentsMap(*controller.SharedState, testNode0.Name)
// addTestNodeInAvailableAttachmentsMap(*controller.SharedState, testNode1.Name)
// addTestNodeInAvailableAttachmentsMap(*controller.SharedState, testNode2.Name)
var count atomic.Int32
count.Store(int32(8))
controller.availableAttachmentsMap.Store(testNode0.Name, &count)
var count1 atomic.Int32
count1.Store(int32(8))
controller.availableAttachmentsMap.Store(testNode1.Name, &count1)
var count2 atomic.Int32
count2.Store(int32(8))
controller.availableAttachmentsMap.Store(testNode2.Name, &count2)
mockClients(controller.cachedClient.(*mockclient.MockClient), controller.azClient, controller.kubeClient)
result, err := controller.Reconcile(context.TODO(), testPod0Request)
require.False(t, result.Requeue)
Expand Down Expand Up @@ -289,6 +324,9 @@ func TestPodReconcile(t *testing.T) {
&testPersistentVolume0,
newPod0)

addTestNodeInAvailableAttachmentsMap(*controller.SharedState, testNode0.Name)
addTestNodeInAvailableAttachmentsMap(*controller.SharedState, testNode1.Name)

mockClients(controller.cachedClient.(*mockclient.MockClient), controller.azClient, controller.kubeClient)

result, err := controller.Reconcile(context.TODO(), testPod0Request)
Expand Down Expand Up @@ -375,6 +413,14 @@ func TestPodRecover(t *testing.T) {
newAttachment1,
newPod)

// addTestNodeInAvailableAttachmentsMap(*controller.SharedState, testNode0.Name)
// addTestNodeInAvailableAttachmentsMap(*controller.SharedState, testNode1.Name)
var count atomic.Int32
count.Store(int32(8))
controller.availableAttachmentsMap.Store(testNode0.Name, &count)
var count1 atomic.Int32
count1.Store(int32(8))
controller.availableAttachmentsMap.Store(testNode1.Name, &count1)
mockClients(controller.cachedClient.(*mockclient.MockClient), controller.azClient, controller.kubeClient)
return controller
},
Expand Down
Loading

0 comments on commit 395e233

Please sign in to comment.