Skip to content

Commit

Permalink
[YUNIKORN-1842] Optimize updatePVCRefCounts (#629)
Browse files Browse the repository at this point in the history
Closes: #629

Signed-off-by: Craig Condit <ccondit@apache.org>
  • Loading branch information
FrankYang0529 authored and craigcondit committed Jul 12, 2023
1 parent 9d1158a commit db5e7b6
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 39 deletions.
57 changes: 18 additions & 39 deletions pkg/cache/external/scheduler_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ type SchedulerCache struct {
assumedPods map[string]bool // map of assumed pods, value indicates if pod volumes are all bound
pendingAllocations map[string]string // map of pod to node ID, presence indicates a pending allocation for scheduler
inProgressAllocations map[string]string // map of pod to node ID, presence indicates an in-process allocation for scheduler
pvcGeneration int64
pvcRefCounts map[string]map[string]int
lock sync.RWMutex
clients *client.Clients // client APIs
Expand Down Expand Up @@ -184,7 +183,7 @@ func (cache *SchedulerCache) updateNode(node *v1.Node) {
nodeInfo.SetNode(node)
cache.nodesInfoPodsWithAffinity = nil
cache.nodesInfoPodsWithReqAntiAffinity = nil
cache.updatePVCRefCounts()
cache.updatePVCRefCounts(nodeInfo, false)
}

func (cache *SchedulerCache) RemoveNode(node *v1.Node) {
Expand Down Expand Up @@ -216,7 +215,7 @@ func (cache *SchedulerCache) removeNode(node *v1.Node) {
cache.nodesInfo = nil
cache.nodesInfoPodsWithAffinity = nil
cache.nodesInfoPodsWithReqAntiAffinity = nil
cache.pvcGeneration = 0
cache.updatePVCRefCounts(nodeInfo, true)
}

func (cache *SchedulerCache) GetPriorityClass(name string) *schedulingv1.PriorityClass {
Expand Down Expand Up @@ -377,6 +376,7 @@ func (cache *SchedulerCache) updatePod(pod *v1.Pod) {
zap.String("nodeName", nodeName),
zap.Error(err))
}
cache.updatePVCRefCounts(nodeInfo, false)
cache.nodesInfoPodsWithAffinity = nil
cache.nodesInfoPodsWithReqAntiAffinity = nil
}
Expand Down Expand Up @@ -409,6 +409,7 @@ func (cache *SchedulerCache) updatePod(pod *v1.Pod) {
cache.assignedPods[key] = pod.Spec.NodeName
cache.nodesInfoPodsWithAffinity = nil
cache.nodesInfoPodsWithReqAntiAffinity = nil
cache.updatePVCRefCounts(nodeInfo, false)
}

// if pod is not in a terminal state, add it back into cache
Expand All @@ -423,7 +424,6 @@ func (cache *SchedulerCache) updatePod(pod *v1.Pod) {
delete(cache.pendingAllocations, key)
delete(cache.inProgressAllocations, key)
}
cache.updatePVCRefCounts()
}

// RemovePod removes a pod from the cache
Expand All @@ -449,6 +449,7 @@ func (cache *SchedulerCache) removePod(pod *v1.Pod) {
zap.Error(err))
}
}
cache.updatePVCRefCounts(nodeInfo, false)
}
delete(cache.podsMap, key)
delete(cache.assignedPods, key)
Expand All @@ -457,7 +458,6 @@ func (cache *SchedulerCache) removePod(pod *v1.Pod) {
delete(cache.inProgressAllocations, key)
cache.nodesInfoPodsWithAffinity = nil
cache.nodesInfoPodsWithReqAntiAffinity = nil
cache.updatePVCRefCounts()
}

func (cache *SchedulerCache) GetPod(uid string) (*v1.Pod, bool) {
Expand Down Expand Up @@ -603,46 +603,25 @@ func (cache *SchedulerCache) IsPVCUsedByPods(key string) bool {
return ok
}

func (cache *SchedulerCache) maxNodeGeneration() int64 {
var maxGeneration int64
for _, node := range cache.nodesMap {
if maxGeneration < node.Generation {
maxGeneration = node.Generation
func (cache *SchedulerCache) updatePVCRefCounts(node *framework.NodeInfo, removeNode bool) {
nodeName := node.Node().Name
for k, v := range cache.pvcRefCounts {
delete(v, nodeName)
if len(v) == 0 {
delete(cache.pvcRefCounts, k)
}
}
return maxGeneration
}

func (cache *SchedulerCache) updatePVCRefCounts() {
maxGeneration := cache.maxNodeGeneration()
updateAll := false
if cache.pvcGeneration == 0 {
// clear cache
updateAll = true
cache.pvcRefCounts = make(map[string]map[string]int)
} else if cache.pvcGeneration >= maxGeneration {
// cache is current
return
}
for nodeName, node := range cache.nodesMap {
if updateAll || node.Generation > cache.pvcGeneration {
for k, v := range cache.pvcRefCounts {
delete(v, nodeName)
if len(k) == 0 {
delete(cache.pvcRefCounts, k)
}
}
for k, count := range node.PVCRefCounts {
entry, ok := cache.pvcRefCounts[k]
if !ok {
entry = make(map[string]int)
cache.pvcRefCounts[k] = entry
}
entry[nodeName] = count
if !removeNode {
for k, count := range node.PVCRefCounts {
entry, ok := cache.pvcRefCounts[k]
if !ok {
entry = make(map[string]int)
cache.pvcRefCounts[k] = entry
}
entry[nodeName] = count
}
}
cache.pvcGeneration = maxGeneration
}

func (cache *SchedulerCache) GetSchedulerCacheDao() SchedulerCacheDao {
Expand Down
90 changes: 90 additions & 0 deletions pkg/cache/external/scheduler_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ const (
podName2 = "pod0002"
podUID1 = "Pod-UID-00001"
podUID2 = "Pod-UID-00002"
pvcName1 = "pvc0001"
pvcName2 = "pvc0002"
)

// this test verifies that no matter which comes first, pod or node,
Expand Down Expand Up @@ -975,3 +977,91 @@ func expectHost(t *testing.T, host string, nodesInfo []*framework.NodeInfo) {
assert.Equal(t, 1, len(nodesInfo), "nodes list size")
assert.Equal(t, host, nodesInfo[0].Node().Name)
}

func TestUpdatePVCRefCounts(t *testing.T) {
cache := NewSchedulerCache(client.NewMockedAPIProvider(false).GetAPIs())
resourceList := make(map[v1.ResourceName]resource.Quantity)
resourceList[v1.ResourceName("memory")] = *resource.NewQuantity(1024*1000*1000, resource.DecimalSI)
resourceList[v1.ResourceName("cpu")] = *resource.NewQuantity(10, resource.DecimalSI)
node1 := &v1.Node{
ObjectMeta: apis.ObjectMeta{
Name: host1,
Namespace: "default",
UID: nodeUID1,
},
Status: v1.NodeStatus{
Allocatable: resourceList,
},
Spec: v1.NodeSpec{
Unschedulable: false,
},
}
node2 := &v1.Node{
ObjectMeta: apis.ObjectMeta{
Name: host2,
Namespace: "default",
UID: nodeUID2,
},
Status: v1.NodeStatus{
Allocatable: resourceList,
},
Spec: v1.NodeSpec{
Unschedulable: false,
},
}

cache.AddNode(node1)
cache.AddNode(node2)

podTemplate := &v1.Pod{
TypeMeta: apis.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: apis.ObjectMeta{
Namespace: "default",
Annotations: map[string]string{"state": "new"},
},
Spec: v1.PodSpec{},
}

pod1 := podTemplate.DeepCopy()
pod1.ObjectMeta.Name = podName1
pod1.ObjectMeta.UID = podUID1
pod1.Spec.NodeName = node1.Name
pod1.Spec.Volumes = []v1.Volume{
{
Name: pvcName1,
VolumeSource: v1.VolumeSource{PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ClaimName: pvcName1}},
},
}
cache.AddPod(pod1)
assert.Check(t, cache.IsPVCUsedByPods(framework.GetNamespacedName(pod1.Namespace, pvcName1)), "pvc1 is not in pvcRefCounts")

// add a pod without assigned node can't update pvcRefCounts
pod2 := podTemplate.DeepCopy()
pod2.ObjectMeta.Name = podName2
pod2.ObjectMeta.UID = podUID2
pod2.Spec.Volumes = []v1.Volume{
{
Name: pvcName2,
VolumeSource: v1.VolumeSource{PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ClaimName: pvcName2}},
},
}
cache.AddPod(pod2)
assert.Check(t, !cache.IsPVCUsedByPods(framework.GetNamespacedName(pod2.Namespace, pvcName2)), "pvc2 is in pvcRefCounts")

// assign a node to pod2
pod2Copy := pod2.DeepCopy()
pod2Copy.Spec.NodeName = node2.Name
cache.UpdatePod(pod2Copy)
assert.Check(t, cache.IsPVCUsedByPods(framework.GetNamespacedName(pod2.Namespace, pvcName2)), "pvc2 is not in pvcRefCounts")

// remove pod1
cache.RemovePod(pod1)
assert.Check(t, !cache.IsPVCUsedByPods(framework.GetNamespacedName(pod1.Namespace, pvcName1)), "pvc1 is in pvcRefCounts")

// remove node2
cache.RemoveNode(node2)
assert.Check(t, !cache.IsPVCUsedByPods(framework.GetNamespacedName(pod2.Namespace, pvcName2)), "pvc2 is in pvcRefCounts")
}

0 comments on commit db5e7b6

Please sign in to comment.