diff --git a/pkg/framework/plugins/removepodsviolatingtopologyspreadconstraint/topologyspreadconstraint.go b/pkg/framework/plugins/removepodsviolatingtopologyspreadconstraint/topologyspreadconstraint.go index 8eed61828d..acad9f6718 100644 --- a/pkg/framework/plugins/removepodsviolatingtopologyspreadconstraint/topologyspreadconstraint.go +++ b/pkg/framework/plugins/removepodsviolatingtopologyspreadconstraint/topologyspreadconstraint.go @@ -29,7 +29,7 @@ import ( utilpointer "k8s.io/utils/pointer" v1helper "k8s.io/component-helpers/scheduling/corev1" - nodeaffinity "k8s.io/component-helpers/scheduling/corev1/nodeaffinity" + "k8s.io/component-helpers/scheduling/corev1/nodeaffinity" "sigs.k8s.io/descheduler/pkg/descheduler/evictions" "sigs.k8s.io/descheduler/pkg/descheduler/node" podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod" @@ -50,6 +50,19 @@ type topology struct { pods []*v1.Pod } +// topologySpreadConstraint is an internal version for v1.TopologySpreadConstraint +// and where the selector is parsed. +// This mirrors scheduler: https://github.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/framework/plugins/podtopologyspread/common.go#L37 +type topologySpreadConstraint struct { + maxSkew int32 + topologyKey string + selector labels.Selector + nodeAffinityPolicy v1.NodeInclusionPolicy + nodeTaintsPolicy v1.NodeInclusionPolicy + podNodeAffinity nodeaffinity.RequiredNodeAffinity + podTolerations []v1.Toleration +} + // RemovePodsViolatingTopologySpreadConstraint evicts pods which violate their topology spread constraints type RemovePodsViolatingTopologySpreadConstraint struct { handle frameworktypes.Handle @@ -81,12 +94,6 @@ func New(args runtime.Object, handle frameworktypes.Handle) (frameworktypes.Plug }, nil } -type topologyConstraintSet struct { - constraint v1.TopologySpreadConstraint - podNodeAffinity nodeaffinity.RequiredNodeAffinity - podTolerations []v1.Toleration -} - // Name retrieves the plugin name func (d *RemovePodsViolatingTopologySpreadConstraint) Name() string { return PluginName @@ -140,7 +147,7 @@ func (d *RemovePodsViolatingTopologySpreadConstraint) Balance(ctx context.Contex } // ...where there is a topology constraint - namespaceTopologySpreadConstraints := []topologyConstraintSet{} + var namespaceTopologySpreadConstraints []topologySpreadConstraint for _, pod := range namespacedPods[namespace] { for _, constraint := range pod.Spec.TopologySpreadConstraints { // Ignore topology constraints if they are not included @@ -148,10 +155,14 @@ func (d *RemovePodsViolatingTopologySpreadConstraint) Balance(ctx context.Contex continue } - namespaceTopologySpreadConstraint := newTopologyConstraintSet(constraint, pod) + namespaceTopologySpreadConstraint, err := newTopologySpreadConstraint(constraint, pod) + if err != nil { + klog.ErrorS(err, "cannot process topology spread constraint") + continue + } - // Need to check v1.TopologySpreadConstraint deepEquality because - // v1.TopologySpreadConstraint has pointer fields + // Need to check TopologySpreadConstraint deepEquality because + // TopologySpreadConstraint can haves pointer fields // and we don't need to go over duplicated constraints later on if hasIdenticalConstraints(namespaceTopologySpreadConstraint, namespaceTopologySpreadConstraints) { continue @@ -164,27 +175,18 @@ func (d *RemovePodsViolatingTopologySpreadConstraint) Balance(ctx context.Contex } // 2. for each topologySpreadConstraint in that namespace - for _, constraintSet := range namespaceTopologySpreadConstraints { - constraint := constraintSet.constraint - nodeAffinity := constraintSet.podNodeAffinity - tolerations := constraintSet.podTolerations + for _, tsc := range namespaceTopologySpreadConstraints { constraintTopologies := make(map[topologyPair][]*v1.Pod) // pre-populate the topologyPair map with all the topologies available from the nodeMap // (we can't just build it from existing pods' nodes because a topology may have 0 pods) for _, node := range nodeMap { - if val, ok := node.Labels[constraint.TopologyKey]; ok { - if matchNodeInclusionPolicies(&constraint, tolerations, node, nodeAffinity) { - constraintTopologies[topologyPair{key: constraint.TopologyKey, value: val}] = make([]*v1.Pod, 0) + if val, ok := node.Labels[tsc.topologyKey]; ok { + if matchNodeInclusionPolicies(tsc, node) { + constraintTopologies[topologyPair{key: tsc.topologyKey, value: val}] = make([]*v1.Pod, 0) } } } - selector, err := metav1.LabelSelectorAsSelector(constraint.LabelSelector) - if err != nil { - klog.ErrorS(err, "Couldn't parse label selector as selector", "selector", constraint.LabelSelector) - continue - } - // 3. for each evictable pod in that namespace // (this loop is where we count the number of pods per topologyValue that match this constraint's selector) var sumPods float64 @@ -194,7 +196,7 @@ func (d *RemovePodsViolatingTopologySpreadConstraint) Balance(ctx context.Contex continue } // 4. if the pod matches this TopologySpreadConstraint LabelSelector - if !selector.Matches(labels.Set(pod.Labels)) { + if !tsc.selector.Matches(labels.Set(pod.Labels)) { continue } @@ -204,21 +206,21 @@ func (d *RemovePodsViolatingTopologySpreadConstraint) Balance(ctx context.Contex // If ok is false, node is nil in which case node.Labels will panic. In which case a pod is yet to be scheduled. So it's safe to just continue here. continue } - nodeValue, ok := node.Labels[constraint.TopologyKey] + nodeValue, ok := node.Labels[tsc.topologyKey] if !ok { continue } // 6. create a topoPair with key as this TopologySpreadConstraint - topoPair := topologyPair{key: constraint.TopologyKey, value: nodeValue} + topoPair := topologyPair{key: tsc.topologyKey, value: nodeValue} // 7. add the pod with key as this topoPair constraintTopologies[topoPair] = append(constraintTopologies[topoPair], pod) sumPods++ } - if topologyIsBalanced(constraintTopologies, constraint) { - klog.V(2).InfoS("Skipping topology constraint because it is already balanced", "constraint", constraint) + if topologyIsBalanced(constraintTopologies, tsc) { + klog.V(2).InfoS("Skipping topology constraint because it is already balanced", "constraint", tsc) continue } - d.balanceDomains(podsForEviction, constraintSet, constraintTopologies, sumPods, nodes) + d.balanceDomains(podsForEviction, tsc, constraintTopologies, sumPods, nodes) } } @@ -243,7 +245,7 @@ func (d *RemovePodsViolatingTopologySpreadConstraint) Balance(ctx context.Contex } // hasIdenticalConstraints checks if we already had an identical TopologySpreadConstraint in namespaceTopologySpreadConstraints slice -func hasIdenticalConstraints(newConstraint topologyConstraintSet, namespaceTopologySpreadConstraints []topologyConstraintSet) bool { +func hasIdenticalConstraints(newConstraint topologySpreadConstraint, namespaceTopologySpreadConstraints []topologySpreadConstraint) bool { for _, constraint := range namespaceTopologySpreadConstraints { if reflect.DeepEqual(newConstraint, constraint) { return true @@ -254,7 +256,7 @@ func hasIdenticalConstraints(newConstraint topologyConstraintSet, namespaceTopol // topologyIsBalanced checks if any domains in the topology differ by more than the MaxSkew // this is called before any sorting or other calculations and is used to skip topologies that don't need to be balanced -func topologyIsBalanced(topology map[topologyPair][]*v1.Pod, constraint v1.TopologySpreadConstraint) bool { +func topologyIsBalanced(topology map[topologyPair][]*v1.Pod, tsc topologySpreadConstraint) bool { minDomainSize := math.MaxInt32 maxDomainSize := math.MinInt32 for _, pods := range topology { @@ -264,7 +266,7 @@ func topologyIsBalanced(topology map[topologyPair][]*v1.Pod, constraint v1.Topol if len(pods) > maxDomainSize { maxDomainSize = len(pods) } - if int32(maxDomainSize-minDomainSize) > constraint.MaxSkew { + if int32(maxDomainSize-minDomainSize) > tsc.maxSkew { return false } } @@ -293,20 +295,19 @@ func topologyIsBalanced(topology map[topologyPair][]*v1.Pod, constraint v1.Topol // (assuming even distribution by the scheduler of the evicted pods) func (d *RemovePodsViolatingTopologySpreadConstraint) balanceDomains( podsForEviction map[*v1.Pod]struct{}, - constraintSet topologyConstraintSet, + tsc topologySpreadConstraint, constraintTopologies map[topologyPair][]*v1.Pod, sumPods float64, nodes []*v1.Node, ) { - constraint := constraintSet.constraint idealAvg := sumPods / float64(len(constraintTopologies)) isEvictable := d.handle.Evictor().Filter sortedDomains := sortDomains(constraintTopologies, isEvictable) getPodsAssignedToNode := d.handle.GetPodsAssignedToNodeFunc() topologyBalanceNodeFit := utilpointer.BoolDeref(d.args.TopologyBalanceNodeFit, true) - eligibleNodes := filterEligibleNodes(nodes, constraintSet) - nodesBelowIdealAvg := filterNodesBelowIdealAvg(eligibleNodes, sortedDomains, constraint.TopologyKey, idealAvg) + eligibleNodes := filterEligibleNodes(nodes, tsc) + nodesBelowIdealAvg := filterNodesBelowIdealAvg(eligibleNodes, sortedDomains, tsc.topologyKey, idealAvg) // i is the index for belowOrEqualAvg // j is the index for aboveAvg @@ -322,7 +323,7 @@ func (d *RemovePodsViolatingTopologySpreadConstraint) balanceDomains( skew := float64(len(sortedDomains[j].pods) - len(sortedDomains[i].pods)) // if k and j are within the maxSkew of each other, move to next belowOrEqualAvg - if int32(skew) <= constraint.MaxSkew { + if int32(skew) <= tsc.maxSkew { i++ continue } @@ -336,7 +337,7 @@ func (d *RemovePodsViolatingTopologySpreadConstraint) balanceDomains( aboveAvg := math.Ceil(float64(len(sortedDomains[j].pods)) - idealAvg) belowAvg := math.Ceil(idealAvg - float64(len(sortedDomains[i].pods))) smallestDiff := math.Min(aboveAvg, belowAvg) - halfSkew := math.Ceil((skew - float64(constraint.MaxSkew)) / 2) + halfSkew := math.Ceil((skew - float64(tsc.maxSkew)) / 2) movePods := int(math.Min(smallestDiff, halfSkew)) if movePods <= 0 { i++ @@ -463,52 +464,82 @@ func doNotScheduleTaintsFilterFunc() func(t *v1.Taint) bool { } } -func filterEligibleNodes(nodes []*v1.Node, constraintSet topologyConstraintSet) []*v1.Node { - constraint := constraintSet.constraint - nodeAffinity := constraintSet.podNodeAffinity - tolerations := constraintSet.podTolerations +func filterEligibleNodes(nodes []*v1.Node, tsc topologySpreadConstraint) []*v1.Node { var eligibleNodes []*v1.Node for _, node := range nodes { - if matchNodeInclusionPolicies(&constraint, tolerations, node, nodeAffinity) { + if matchNodeInclusionPolicies(tsc, node) { eligibleNodes = append(eligibleNodes, node) } } return eligibleNodes } -func matchNodeInclusionPolicies(tsc *v1.TopologySpreadConstraint, tolerations []v1.Toleration, node *v1.Node, require nodeaffinity.RequiredNodeAffinity) bool { - // Nil is equivalent to honor - if tsc.NodeAffinityPolicy == nil || *tsc.NodeAffinityPolicy == v1.NodeInclusionPolicyHonor { +func matchNodeInclusionPolicies(tsc topologySpreadConstraint, node *v1.Node) bool { + if tsc.nodeAffinityPolicy == v1.NodeInclusionPolicyHonor { // We ignore parsing errors here for backwards compatibility. - if match, _ := require.Match(node); !match { + if match, _ := tsc.podNodeAffinity.Match(node); !match { return false } } - // Nil is equivalent to ignore - if tsc.NodeTaintsPolicy != nil && *tsc.NodeTaintsPolicy == v1.NodeInclusionPolicyHonor { - if _, untolerated := v1helper.FindMatchingUntoleratedTaint(node.Spec.Taints, tolerations, doNotScheduleTaintsFilterFunc()); untolerated { + if tsc.nodeTaintsPolicy == v1.NodeInclusionPolicyHonor { + if _, untolerated := v1helper.FindMatchingUntoleratedTaint(node.Spec.Taints, tsc.podTolerations, doNotScheduleTaintsFilterFunc()); untolerated { return false } } return true } -func newTopologyConstraintSet(constraint v1.TopologySpreadConstraint, pod *v1.Pod) topologyConstraintSet { - if pod.Labels != nil && len(constraint.MatchLabelKeys) > 0 { - if constraint.LabelSelector == nil { - constraint.LabelSelector = &metav1.LabelSelector{} - } +// inspired by Scheduler: https://github.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/framework/plugins/podtopologyspread/common.go#L90 +func newTopologySpreadConstraint(constraint v1.TopologySpreadConstraint, pod *v1.Pod) (topologySpreadConstraint, error) { + selector, err := metav1.LabelSelectorAsSelector(constraint.LabelSelector) + if err != nil { + return topologySpreadConstraint{}, err + } + if len(constraint.MatchLabelKeys) > 0 && pod.Labels != nil { + matchLabels := make(labels.Set) for _, labelKey := range constraint.MatchLabelKeys { - metav1.AddLabelToSelector(constraint.LabelSelector, labelKey, pod.Labels[labelKey]) + if value, ok := pod.Labels[labelKey]; ok { + matchLabels[labelKey] = value + } } + if len(matchLabels) > 0 { + selector = mergeLabelSetWithSelector(matchLabels, selector) + } + } + + tsc := topologySpreadConstraint{ + maxSkew: constraint.MaxSkew, + topologyKey: constraint.TopologyKey, + selector: selector, + nodeAffinityPolicy: v1.NodeInclusionPolicyHonor, // If NodeAffinityPolicy is nil, we treat NodeAffinityPolicy as "Honor". + nodeTaintsPolicy: v1.NodeInclusionPolicyIgnore, // If NodeTaintsPolicy is nil, we treat NodeTaintsPolicy as "Ignore". + podNodeAffinity: nodeaffinity.GetRequiredNodeAffinity(pod), + podTolerations: pod.Spec.Tolerations, + } + if constraint.NodeAffinityPolicy != nil { + tsc.nodeAffinityPolicy = *constraint.NodeAffinityPolicy + } + if constraint.NodeTaintsPolicy != nil { + tsc.nodeTaintsPolicy = *constraint.NodeTaintsPolicy + } + + return tsc, nil +} + +// Scheduler: https://github.com/kubernetes/kubernetes/blob/release-1.28/pkg/scheduler/framework/plugins/podtopologyspread/common.go#L136 +func mergeLabelSetWithSelector(matchLabels labels.Set, s labels.Selector) labels.Selector { + mergedSelector := labels.SelectorFromSet(matchLabels) + + requirements, ok := s.Requirements() + if !ok { + return s } - requiredSchedulingTerm := nodeaffinity.GetRequiredNodeAffinity(pod) - return topologyConstraintSet{ - constraint: constraint, - podNodeAffinity: requiredSchedulingTerm, - podTolerations: pod.Spec.Tolerations, + for _, r := range requirements { + mergedSelector = mergedSelector.Add(r) } + + return mergedSelector } diff --git a/pkg/framework/plugins/removepodsviolatingtopologyspreadconstraint/topologyspreadconstraint_test.go b/pkg/framework/plugins/removepodsviolatingtopologyspreadconstraint/topologyspreadconstraint_test.go index e97485cef9..67aab19c2d 100644 --- a/pkg/framework/plugins/removepodsviolatingtopologyspreadconstraint/topologyspreadconstraint_test.go +++ b/pkg/framework/plugins/removepodsviolatingtopologyspreadconstraint/topologyspreadconstraint_test.go @@ -16,7 +16,6 @@ import ( "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" "k8s.io/client-go/tools/events" - "k8s.io/component-helpers/scheduling/corev1/nodeaffinity" utilpointer "k8s.io/utils/pointer" "sigs.k8s.io/descheduler/pkg/api" @@ -1606,30 +1605,29 @@ func getDefaultTopologyConstraintsWithPodTemplateHashMatch(maxSkew int32) []v1.T } func TestCheckIdenticalConstraints(t *testing.T) { - newConstraintSame := v1.TopologySpreadConstraint{ - MaxSkew: 2, - TopologyKey: "zone", - WhenUnsatisfiable: v1.DoNotSchedule, - LabelSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}, + selector, _ := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}) + + newConstraintSame := topologySpreadConstraint{ + maxSkew: 2, + topologyKey: "zone", + selector: selector.DeepCopySelector(), } - newConstraintDifferent := v1.TopologySpreadConstraint{ - MaxSkew: 3, - TopologyKey: "node", - WhenUnsatisfiable: v1.DoNotSchedule, - LabelSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}, + newConstraintDifferent := topologySpreadConstraint{ + maxSkew: 3, + topologyKey: "node", + selector: selector.DeepCopySelector(), } - namespaceTopologySpreadConstraint := []v1.TopologySpreadConstraint{ + namespaceTopologySpreadConstraint := []topologySpreadConstraint{ { - MaxSkew: 2, - TopologyKey: "zone", - WhenUnsatisfiable: v1.DoNotSchedule, - LabelSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}, + maxSkew: 2, + topologyKey: "zone", + selector: selector.DeepCopySelector(), }, } testCases := []struct { name string - namespaceTopologySpreadConstraints []v1.TopologySpreadConstraint - newConstraint v1.TopologySpreadConstraint + namespaceTopologySpreadConstraints []topologySpreadConstraint + newConstraint topologySpreadConstraint expectedResult bool }{ { @@ -1647,19 +1645,7 @@ func TestCheckIdenticalConstraints(t *testing.T) { } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - var constraintSets []topologyConstraintSet - for _, constraints := range tc.namespaceTopologySpreadConstraints { - constraintSets = append(constraintSets, topologyConstraintSet{ - constraint: constraints, - podNodeAffinity: nodeaffinity.RequiredNodeAffinity{}, - podTolerations: []v1.Toleration{}, - }) - } - isIdentical := hasIdenticalConstraints(topologyConstraintSet{ - constraint: tc.newConstraint, - podNodeAffinity: nodeaffinity.RequiredNodeAffinity{}, - podTolerations: []v1.Toleration{}, - }, constraintSets) + isIdentical := hasIdenticalConstraints(tc.newConstraint, tc.namespaceTopologySpreadConstraints) if isIdentical != tc.expectedResult { t.Errorf("Test error for description: %s. Expected result %v, got %v", tc.name, tc.expectedResult, isIdentical) } diff --git a/test/e2e/e2e_failedpods_test.go b/test/e2e/e2e_failedpods_test.go index 7efcc7c4c5..6908d3f36f 100644 --- a/test/e2e/e2e_failedpods_test.go +++ b/test/e2e/e2e_failedpods_test.go @@ -17,6 +17,7 @@ import ( "sigs.k8s.io/descheduler/pkg/framework/plugins/defaultevictor" "sigs.k8s.io/descheduler/pkg/framework/plugins/removefailedpods" frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types" + "sigs.k8s.io/descheduler/test" ) var oneHourPodLifetimeSeconds uint = 3600 @@ -131,7 +132,7 @@ func TestFailedPods(t *testing.T) { } func initFailedJob(name, namespace string) *batchv1.Job { - podSpec := MakePodSpec("", nil) + podSpec := test.MakePodSpec("", nil) podSpec.Containers[0].Command = []string{"/bin/false"} podSpec.RestartPolicy = v1.RestartPolicyNever labelsSet := labels.Set{"test": name, "name": name} diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index d3aa08e7f5..191bd01f39 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -55,47 +55,9 @@ import ( "sigs.k8s.io/descheduler/pkg/framework/plugins/podlifetime" frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types" "sigs.k8s.io/descheduler/pkg/utils" + "sigs.k8s.io/descheduler/test" ) -func MakePodSpec(priorityClassName string, gracePeriod *int64) v1.PodSpec { - return v1.PodSpec{ - SecurityContext: &v1.PodSecurityContext{ - RunAsNonRoot: utilpointer.Bool(true), - RunAsUser: utilpointer.Int64(1000), - RunAsGroup: utilpointer.Int64(1000), - SeccompProfile: &v1.SeccompProfile{ - Type: v1.SeccompProfileTypeRuntimeDefault, - }, - }, - Containers: []v1.Container{{ - Name: "pause", - ImagePullPolicy: "Never", - Image: "registry.k8s.io/pause", - Ports: []v1.ContainerPort{{ContainerPort: 80}}, - Resources: v1.ResourceRequirements{ - Limits: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("100m"), - v1.ResourceMemory: resource.MustParse("200Mi"), - }, - Requests: v1.ResourceList{ - v1.ResourceCPU: resource.MustParse("100m"), - v1.ResourceMemory: resource.MustParse("100Mi"), - }, - }, - SecurityContext: &v1.SecurityContext{ - AllowPrivilegeEscalation: utilpointer.Bool(false), - Capabilities: &v1.Capabilities{ - Drop: []v1.Capability{ - "ALL", - }, - }, - }, - }}, - PriorityClassName: priorityClassName, - TerminationGracePeriodSeconds: gracePeriod, - } -} - // RcByNameContainer returns a ReplicationController with specified name and container func RcByNameContainer(name, namespace string, replicas int32, labels map[string]string, gracePeriod *int64, priorityClassName string) *v1.ReplicationController { // Add "name": name to the labels, overwriting if it exists. @@ -121,7 +83,7 @@ func RcByNameContainer(name, namespace string, replicas int32, labels map[string ObjectMeta: metav1.ObjectMeta{ Labels: labels, }, - Spec: MakePodSpec(priorityClassName, gracePeriod), + Spec: test.MakePodSpec(priorityClassName, gracePeriod), }, }, } diff --git a/test/e2e/e2e_topologyspreadconstraint_test.go b/test/e2e/e2e_topologyspreadconstraint_test.go index 48ebaab275..6c925e4e25 100644 --- a/test/e2e/e2e_topologyspreadconstraint_test.go +++ b/test/e2e/e2e_topologyspreadconstraint_test.go @@ -14,6 +14,7 @@ import ( "sigs.k8s.io/descheduler/pkg/framework/plugins/defaultevictor" "sigs.k8s.io/descheduler/pkg/framework/plugins/removepodsviolatingtopologyspreadconstraint" frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types" + "sigs.k8s.io/descheduler/test" ) const zoneTopologyKey string = "topology.kubernetes.io/zone" @@ -39,7 +40,7 @@ func TestTopologySpreadConstraint(t *testing.T) { replicaCount int topologySpreadConstraint v1.TopologySpreadConstraint }{ - "test-rc-topology-spread-hard-constraint": { + "test-topology-spread-hard-constraint": { expectedEvictedCount: 1, replicaCount: 4, topologySpreadConstraint: v1.TopologySpreadConstraint{ @@ -53,7 +54,7 @@ func TestTopologySpreadConstraint(t *testing.T) { WhenUnsatisfiable: v1.DoNotSchedule, }, }, - "test-rc-topology-spread-soft-constraint": { + "test-topology-spread-soft-constraint": { expectedEvictedCount: 1, replicaCount: 4, topologySpreadConstraint: v1.TopologySpreadConstraint{ @@ -67,7 +68,7 @@ func TestTopologySpreadConstraint(t *testing.T) { WhenUnsatisfiable: v1.ScheduleAnyway, }, }, - "test-rc-node-taints-policy-honor": { + "test-node-taints-policy-honor": { expectedEvictedCount: 1, replicaCount: 4, topologySpreadConstraint: v1.TopologySpreadConstraint{ @@ -82,7 +83,7 @@ func TestTopologySpreadConstraint(t *testing.T) { WhenUnsatisfiable: v1.DoNotSchedule, }, }, - "test-rc-node-affinity-policy-ignore": { + "test-node-affinity-policy-ignore": { expectedEvictedCount: 1, replicaCount: 4, topologySpreadConstraint: v1.TopologySpreadConstraint{ @@ -97,7 +98,7 @@ func TestTopologySpreadConstraint(t *testing.T) { WhenUnsatisfiable: v1.DoNotSchedule, }, }, - "test-rc-match-label-keys": { + "test-match-label-keys": { expectedEvictedCount: 0, replicaCount: 4, topologySpreadConstraint: v1.TopologySpreadConstraint{ @@ -115,26 +116,27 @@ func TestTopologySpreadConstraint(t *testing.T) { } for name, tc := range testCases { t.Run(name, func(t *testing.T) { - t.Logf("Creating RC %s with %d replicas", name, tc.replicaCount) - rc := RcByNameContainer(name, testNamespace.Name, int32(tc.replicaCount), tc.topologySpreadConstraint.LabelSelector.DeepCopy().MatchLabels, nil, "") - rc.Spec.Template.Spec.TopologySpreadConstraints = []v1.TopologySpreadConstraint{tc.topologySpreadConstraint} - if _, err := clientSet.CoreV1().ReplicationControllers(rc.Namespace).Create(ctx, rc, metav1.CreateOptions{}); err != nil { - t.Fatalf("Error creating RC %s %v", name, err) + t.Logf("Creating Deployment %s with %d replicas", name, tc.replicaCount) + deployment := test.BuildTestDeployment(name, testNamespace.Name, int32(tc.replicaCount), tc.topologySpreadConstraint.LabelSelector.DeepCopy().MatchLabels, func(d *appsv1.Deployment) { + d.Spec.Template.Spec.TopologySpreadConstraints = []v1.TopologySpreadConstraint{tc.topologySpreadConstraint} + }) + if _, err := clientSet.AppsV1().Deployments(deployment.Namespace).Create(ctx, deployment, metav1.CreateOptions{}); err != nil { + t.Fatalf("Error creating Deployment %s %v", name, err) } - defer deleteRC(ctx, t, clientSet, rc) - waitForRCPodsRunning(ctx, t, clientSet, rc) + defer test.DeleteDeployment(ctx, t, clientSet, deployment) + test.WaitForDeploymentPodsRunning(ctx, t, clientSet, deployment) - // Create a "Violator" RC that has the same label and is forced to be on the same node using a nodeSelector - violatorRcName := name + "-violator" + // Create a "Violator" Deployment that has the same label and is forced to be on the same node using a nodeSelector + violatorDeploymentName := name + "-violator" violatorCount := tc.topologySpreadConstraint.MaxSkew + 1 - violatorRc := RcByNameContainer(violatorRcName, testNamespace.Name, violatorCount, tc.topologySpreadConstraint.LabelSelector.DeepCopy().MatchLabels, nil, "") - violatorRc.Spec.Template.Spec.NodeSelector = map[string]string{zoneTopologyKey: workerNodes[0].Labels[zoneTopologyKey]} - rc.Spec.Template.Spec.TopologySpreadConstraints = []v1.TopologySpreadConstraint{tc.topologySpreadConstraint} - if _, err := clientSet.CoreV1().ReplicationControllers(rc.Namespace).Create(ctx, violatorRc, metav1.CreateOptions{}); err != nil { - t.Fatalf("Error creating RC %s: %v", violatorRcName, err) + violatorDeployment := test.BuildTestDeployment(violatorDeploymentName, testNamespace.Name, violatorCount, tc.topologySpreadConstraint.LabelSelector.DeepCopy().MatchLabels, func(d *appsv1.Deployment) { + d.Spec.Template.Spec.NodeSelector = map[string]string{zoneTopologyKey: workerNodes[0].Labels[zoneTopologyKey]} + }) + if _, err := clientSet.AppsV1().Deployments(deployment.Namespace).Create(ctx, violatorDeployment, metav1.CreateOptions{}); err != nil { + t.Fatalf("Error creating Deployment %s: %v", violatorDeploymentName, err) } - defer deleteRC(ctx, t, clientSet, violatorRc) - waitForRCPodsRunning(ctx, t, clientSet, violatorRc) + defer test.DeleteDeployment(ctx, t, clientSet, violatorDeployment) + test.WaitForDeploymentPodsRunning(ctx, t, clientSet, violatorDeployment) podEvictor := initPodEvictorOrFail(t, clientSet, getPodsAssignedToNode, nodes) @@ -177,7 +179,7 @@ func TestTopologySpreadConstraint(t *testing.T) { t.Logf("Finished RemovePodsViolatingTopologySpreadConstraint strategy for %s", name) t.Logf("Wait for terminating pods of %s to disappear", name) - waitForTerminatingPodsToDisappear(ctx, t, clientSet, rc.Namespace) + waitForTerminatingPodsToDisappear(ctx, t, clientSet, deployment.Namespace) if totalEvicted := podEvictor.TotalEvicted(); totalEvicted == tc.expectedEvictedCount { t.Logf("Total of %d Pods were evicted for %s", totalEvicted, name) @@ -190,7 +192,7 @@ func TestTopologySpreadConstraint(t *testing.T) { } // Ensure recently evicted Pod are rescheduled and running before asserting for a balanced topology spread - waitForRCPodsRunning(ctx, t, clientSet, rc) + test.WaitForDeploymentPodsRunning(ctx, t, clientSet, deployment) listOptions := metav1.ListOptions{LabelSelector: labels.SelectorFromSet(tc.topologySpreadConstraint.LabelSelector.MatchLabels).String()} pods, err := clientSet.CoreV1().Pods(testNamespace.Name).List(ctx, listOptions) diff --git a/test/test_utils.go b/test/test_utils.go index ccd9bd70e6..739045602e 100644 --- a/test/test_utils.go +++ b/test/test_utils.go @@ -17,13 +17,58 @@ limitations under the License. package test import ( + "context" "fmt" + "strings" + "testing" + "time" + appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/wait" + clientset "k8s.io/client-go/kubernetes" + utilpointer "k8s.io/utils/pointer" ) +func BuildTestDeployment(name, namespace string, replicas int32, labels map[string]string, apply func(deployment *appsv1.Deployment)) *appsv1.Deployment { + // Add "name": name to the labels, overwriting if it exists. + labels["name"] = name + + deployment := &appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + Kind: "Deployment", + APIVersion: "apps/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: utilpointer.Int32(replicas), + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "name": name, + }, + }, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + }, + Spec: MakePodSpec("", utilpointer.Int64(0)), + }, + }, + } + + if apply != nil { + apply(deployment) + } + + return deployment +} + // BuildTestPod creates a test pod with given parameters. func BuildTestPod(name string, cpu, memory int64, nodeName string, apply func(*v1.Pod)) *v1.Pod { pod := &v1.Pod{ @@ -124,6 +169,45 @@ func BuildTestNode(name string, millicpu, mem, pods int64, apply func(*v1.Node)) return node } +func MakePodSpec(priorityClassName string, gracePeriod *int64) v1.PodSpec { + return v1.PodSpec{ + SecurityContext: &v1.PodSecurityContext{ + RunAsNonRoot: utilpointer.Bool(true), + RunAsUser: utilpointer.Int64(1000), + RunAsGroup: utilpointer.Int64(1000), + SeccompProfile: &v1.SeccompProfile{ + Type: v1.SeccompProfileTypeRuntimeDefault, + }, + }, + Containers: []v1.Container{{ + Name: "pause", + ImagePullPolicy: "Never", + Image: "registry.k8s.io/pause", + Ports: []v1.ContainerPort{{ContainerPort: 80}}, + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("100m"), + v1.ResourceMemory: resource.MustParse("200Mi"), + }, + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("100m"), + v1.ResourceMemory: resource.MustParse("100Mi"), + }, + }, + SecurityContext: &v1.SecurityContext{ + AllowPrivilegeEscalation: utilpointer.Bool(false), + Capabilities: &v1.Capabilities{ + Drop: []v1.Capability{ + "ALL", + }, + }, + }, + }}, + PriorityClassName: priorityClassName, + TerminationGracePeriodSeconds: gracePeriod, + } +} + // MakeBestEffortPod makes the given pod a BestEffort pod func MakeBestEffortPod(pod *v1.Pod) { pod.Spec.Containers[0].Resources.Requests = nil @@ -179,8 +263,77 @@ func SetPodExtendedResourceRequest(pod *v1.Pod, resourceName v1.ResourceName, re pod.Spec.Containers[0].Resources.Requests[resourceName] = *resource.NewQuantity(requestQuantity, resource.DecimalSI) } -// SetNodeExtendedResouces sets the given node's extended resources +// SetNodeExtendedResource sets the given node's extended resources func SetNodeExtendedResource(node *v1.Node, resourceName v1.ResourceName, requestQuantity int64) { node.Status.Capacity[resourceName] = *resource.NewQuantity(requestQuantity, resource.DecimalSI) node.Status.Allocatable[resourceName] = *resource.NewQuantity(requestQuantity, resource.DecimalSI) } + +func DeleteDeployment(ctx context.Context, t *testing.T, clientSet clientset.Interface, deployment *appsv1.Deployment) { + // set number of replicas to 0 + deploymentCopy := deployment.DeepCopy() + deploymentCopy.Spec.Replicas = utilpointer.Int32(0) + if _, err := clientSet.AppsV1().Deployments(deploymentCopy.Namespace).Update(ctx, deploymentCopy, metav1.UpdateOptions{}); err != nil { + t.Fatalf("Error updating replica controller %v", err) + } + + // wait 30 seconds until all pods are deleted + if err := wait.PollUntilContextTimeout(ctx, 5*time.Second, 30*time.Second, true, func(c context.Context) (bool, error) { + scale, err := clientSet.AppsV1().Deployments(deployment.Namespace).GetScale(c, deployment.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + return scale.Spec.Replicas == 0, nil + }); err != nil { + t.Fatalf("Error deleting Deployment pods %v", err) + } + + if err := wait.PollUntilContextTimeout(ctx, 5*time.Second, 30*time.Second, true, func(c context.Context) (bool, error) { + podList, _ := clientSet.CoreV1().Pods(deployment.Namespace).List(ctx, metav1.ListOptions{LabelSelector: labels.SelectorFromSet(deployment.Spec.Template.Labels).String()}) + t.Logf("Waiting for %v Deployment pods to disappear, still %v remaining", deployment.Name, len(podList.Items)) + if len(podList.Items) > 0 { + return false, nil + } + return true, nil + }); err != nil { + t.Fatalf("Error waiting for Deployment pods to disappear: %v", err) + } + + if err := clientSet.AppsV1().Deployments(deployment.Namespace).Delete(ctx, deployment.Name, metav1.DeleteOptions{}); err != nil { + t.Fatalf("Error deleting Deployment %v", err) + } + + if err := wait.PollUntilContextTimeout(ctx, 5*time.Second, 30*time.Second, true, func(c context.Context) (bool, error) { + _, err := clientSet.AppsV1().Deployments(deployment.Namespace).Get(ctx, deployment.Name, metav1.GetOptions{}) + if err != nil && strings.Contains(err.Error(), "not found") { + return true, nil + } + return false, nil + }); err != nil { + t.Fatalf("Error deleting Deployment %v", err) + } +} + +func WaitForDeploymentPodsRunning(ctx context.Context, t *testing.T, clientSet clientset.Interface, deployment *appsv1.Deployment) { + if err := wait.PollUntilContextTimeout(ctx, 5*time.Second, 30*time.Second, true, func(c context.Context) (bool, error) { + podList, err := clientSet.CoreV1().Pods(deployment.Namespace).List(ctx, metav1.ListOptions{ + LabelSelector: labels.SelectorFromSet(deployment.Spec.Template.ObjectMeta.Labels).String(), + }) + if err != nil { + return false, err + } + if len(podList.Items) != int(*deployment.Spec.Replicas) { + t.Logf("Waiting for %v pods to be created, got %v instead", *deployment.Spec.Replicas, len(podList.Items)) + return false, nil + } + for _, pod := range podList.Items { + if pod.Status.Phase != v1.PodRunning { + t.Logf("Pod %v not running yet, is %v instead", pod.Name, pod.Status.Phase) + return false, nil + } + } + return true, nil + }); err != nil { + t.Fatalf("Error waiting for pods running: %v", err) + } +}