From 4ee689ac2fc164c825c8524694a6743373bcfae4 Mon Sep 17 00:00:00 2001 From: jiangkaihua Date: Wed, 29 Mar 2023 16:41:16 +0800 Subject: [PATCH] Refactor namespace fairshare function. Signed-off-by: jiangkaihua --- pkg/scheduler/actions/allocate/allocate.go | 84 ++---- .../actions/allocate/allocate_test.go | 7 +- pkg/scheduler/api/cluster_info.go | 3 +- pkg/scheduler/api/namespace_info.go | 91 +----- pkg/scheduler/api/namespace_info_test.go | 66 +---- pkg/scheduler/cache/cache.go | 5 +- pkg/scheduler/conf/scheduler_conf.go | 2 - pkg/scheduler/framework/session.go | 3 - pkg/scheduler/framework/session_plugins.go | 30 -- pkg/scheduler/plugins/defaults.go | 3 - pkg/scheduler/plugins/drf/drf.go | 127 --------- pkg/scheduler/util_test.go | 259 +++++++++--------- test/e2e/schedulingbase/job_scheduling.go | 114 -------- test/e2e/util/configmap.go | 2 - 14 files changed, 164 insertions(+), 632 deletions(-) diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index e3d1c5a2e2..206fde509a 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -46,18 +46,16 @@ func (alloc *Action) Execute(ssn *framework.Session) { defer klog.V(3).Infof("Leaving Allocate ...") // the allocation for pod may have many stages - // 1. pick a namespace named N (using ssn.NamespaceOrderFn) - // 2. pick a queue named Q from N (using ssn.QueueOrderFn) - // 3. pick a job named J from Q (using ssn.JobOrderFn) - // 4. pick a task T from J (using ssn.TaskOrderFn) - // 5. use predicateFn to filter out node that T can not be allocated on. - // 6. use ssn.NodeOrderFn to judge the best node and assign it to T + // 1. pick a queue named Q (using ssn.QueueOrderFn) + // 2. pick a job named J from Q (using ssn.JobOrderFn) + // 3. pick a task T from J (using ssn.TaskOrderFn) + // 4. use predicateFn to filter out node that T can not be allocated on. + // 5. use ssn.NodeOrderFn to judge the best node and assign it to T - namespaces := util.NewPriorityQueue(ssn.NamespaceOrderFn) - - // jobsMap is map[api.NamespaceName]map[api.QueueID]PriorityQueue(*api.JobInfo) - // used to find job with highest priority in given queue and namespace - jobsMap := map[api.NamespaceName]map[api.QueueID]*util.PriorityQueue{} + // queues sort queues by QueueOrderFn. + queues := util.NewPriorityQueue(ssn.QueueOrderFn) + // jobsMap is used to find job with the highest priority in given queue. + jobsMap := map[api.QueueID]*util.PriorityQueue{} for _, job := range ssn.Jobs { // If not config enqueue action, change Pending pg into Inqueue statue to avoid blocking job scheduling. @@ -84,26 +82,16 @@ func (alloc *Action) Execute(ssn *framework.Session) { continue } - namespace := api.NamespaceName(job.Namespace) - queueMap, found := jobsMap[namespace] - if !found { - namespaces.Push(namespace) - - queueMap = make(map[api.QueueID]*util.PriorityQueue) - jobsMap[namespace] = queueMap - } - - jobs, found := queueMap[job.Queue] - if !found { - jobs = util.NewPriorityQueue(ssn.JobOrderFn) - queueMap[job.Queue] = jobs + if _, found := jobsMap[job.Queue]; !found { + jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn) + queues.Push(ssn.Queues[job.Queue]) } klog.V(4).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue) - jobs.Push(job) + jobsMap[job.Queue].Push(job) } - klog.V(3).Infof("Try to allocate resource to %d Namespaces", len(jobsMap)) + klog.V(3).Infof("Try to allocate resource to %d Queues", len(jobsMap)) pendingTasks := map[api.JobID]*util.PriorityQueue{} @@ -121,49 +109,21 @@ func (alloc *Action) Execute(ssn *framework.Session) { // Because we believe that number of queues would less than namespaces in most case. // And, this action would make the resource usage among namespace balanced. for { - if namespaces.Empty() { + if queues.Empty() { break } - // pick namespace from namespaces PriorityQueue - namespace := namespaces.Pop().(api.NamespaceName) - - queueInNamespace := jobsMap[namespace] - - // pick queue for given namespace - // - // This block use an algorithm with time complex O(n). - // But at least PriorityQueue could not be used here, - // because the allocation of job would change the priority of queue among all namespaces, - // and the PriorityQueue have no ability to update priority for a special queue. - var queue *api.QueueInfo - for queueID := range queueInNamespace { - currentQueue := ssn.Queues[queueID] - if ssn.Overused(currentQueue) { - klog.V(3).Infof("Namespace <%s> Queue <%s> is overused, ignore it.", namespace, currentQueue.Name) - delete(queueInNamespace, queueID) - continue - } - if jobs, found := queueInNamespace[currentQueue.UID]; found && jobs.Empty() { - continue - } - - if queue == nil || ssn.QueueOrderFn(currentQueue, queue) { - queue = currentQueue - } - } + queue := queues.Pop().(*api.QueueInfo) - if queue == nil { - klog.V(3).Infof("Namespace <%s> have no queue, skip it", namespace) + if ssn.Overused(queue) { + klog.V(3).Infof("Queue <%s> is overused, ignore it.", queue.Name) continue } - klog.V(3).Infof("Try to allocate resource to Jobs in Namespace <%s> Queue <%v>", namespace, queue.Name) + klog.V(3).Infof("Try to allocate resource to Jobs in Queue <%s>", queue.Name) - jobs, found := queueInNamespace[queue.UID] + jobs, found := jobsMap[queue.UID] if !found || jobs.Empty() { - delete(queueInNamespace, queue.UID) - namespaces.Push(namespace) klog.V(4).Infof("Can not find jobs for queue %s.", queue.Name) continue } @@ -185,8 +145,8 @@ func (alloc *Action) Execute(ssn *framework.Session) { } tasks := pendingTasks[job.UID] - // Added Namespace back until no job in Namespace. - namespaces.Push(namespace) + // Added Queue back until no job in Namespace. + queues.Push(queue) if tasks.Empty() { continue diff --git a/pkg/scheduler/actions/allocate/allocate_test.go b/pkg/scheduler/actions/allocate/allocate_test.go index 5c7549a71a..42b5c50e91 100644 --- a/pkg/scheduler/actions/allocate/allocate_test.go +++ b/pkg/scheduler/actions/allocate/allocate_test.go @@ -280,10 +280,9 @@ func TestAllocate(t *testing.T) { { Plugins: []conf.PluginOption{ { - Name: "drf", - EnabledPreemptable: &trueValue, - EnabledJobOrder: &trueValue, - EnabledNamespaceOrder: &trueValue, + Name: "drf", + EnabledPreemptable: &trueValue, + EnabledJobOrder: &trueValue, }, { Name: "proportion", diff --git a/pkg/scheduler/api/cluster_info.go b/pkg/scheduler/api/cluster_info.go index 53569a8840..cc43805a90 100644 --- a/pkg/scheduler/api/cluster_info.go +++ b/pkg/scheduler/api/cluster_info.go @@ -65,8 +65,7 @@ func (ci ClusterInfo) String() string { if len(ci.NamespaceInfo) != 0 { str += "Namespaces:\n" for _, ns := range ci.NamespaceInfo { - str += fmt.Sprintf("\t Namespace(%s) Weight(%v)\n", - ns.Name, ns.Weight) + str += fmt.Sprintf("\t Namespace(%s)\n", ns.Name) } } diff --git a/pkg/scheduler/api/namespace_info.go b/pkg/scheduler/api/namespace_info.go index ae3656986a..519b5a3db4 100644 --- a/pkg/scheduler/api/namespace_info.go +++ b/pkg/scheduler/api/namespace_info.go @@ -17,68 +17,23 @@ limitations under the License. package api import ( - "fmt" - v1 "k8s.io/api/core/v1" - "k8s.io/client-go/tools/cache" - "k8s.io/klog/v2" ) // NamespaceName is name of namespace type NamespaceName string -const ( - // NamespaceWeightKey is the key in ResourceQuota.spec.hard indicating the weight of this namespace - NamespaceWeightKey = "volcano.sh/namespace.weight" - // DefaultNamespaceWeight is the default weight of namespace - DefaultNamespaceWeight = 1 -) - // NamespaceInfo records information of namespace type NamespaceInfo struct { // Name is the name of this namespace Name NamespaceName - // Weight is the highest weight among many ResourceQuota. - Weight int64 - // QuotaStatus stores the ResourceQuotaStatus of all ResourceQuotas in this namespace QuotaStatus map[string]v1.ResourceQuotaStatus } -// GetWeight returns weight of a namespace, any invalid case would get default value -func (n *NamespaceInfo) GetWeight() int64 { - if n == nil || n.Weight == 0 { - return DefaultNamespaceWeight - } - return n.Weight -} - -type quotaItem struct { - name string - weight int64 -} - -func quotaItemKeyFunc(obj interface{}) (string, error) { - item, ok := obj.(*quotaItem) - if !ok { - return "", fmt.Errorf("obj with type %T could not parse", obj) - } - return item.name, nil -} - -// for big root heap -func quotaItemLessFunc(a interface{}, b interface{}) bool { - A := a.(*quotaItem) - B := b.(*quotaItem) - return A.weight > B.weight -} - // NamespaceCollection will record all details about namespace type NamespaceCollection struct { - Name string - - quotaWeight *cache.Heap - + Name string QuotaStatus map[string]v1.ResourceQuotaStatus } @@ -86,69 +41,25 @@ type NamespaceCollection struct { func NewNamespaceCollection(name string) *NamespaceCollection { n := &NamespaceCollection{ Name: name, - quotaWeight: cache.NewHeap(quotaItemKeyFunc, quotaItemLessFunc), QuotaStatus: make(map[string]v1.ResourceQuotaStatus), } - // add at least one item into quotaWeight. - // Because cache.Heap.Pop would be blocked until queue is not empty - n.updateWeight("aItem{ - name: NamespaceWeightKey, - weight: DefaultNamespaceWeight, - }) return n } -func (n *NamespaceCollection) deleteWeight(q *quotaItem) { - n.quotaWeight.Delete(q) -} - -func (n *NamespaceCollection) updateWeight(q *quotaItem) { - n.quotaWeight.Update(q) -} - -func itemFromQuota(quota *v1.ResourceQuota) *quotaItem { - var weight int64 = DefaultNamespaceWeight - - quotaWeight, ok := quota.Spec.Hard[NamespaceWeightKey] - if ok { - weight = quotaWeight.Value() - } - - item := "aItem{ - name: quota.Name, - weight: weight, - } - return item -} - // Update modify the registered information according quota object func (n *NamespaceCollection) Update(quota *v1.ResourceQuota) { - n.updateWeight(itemFromQuota(quota)) n.QuotaStatus[quota.Name] = quota.Status } // Delete remove the registered information according quota object func (n *NamespaceCollection) Delete(quota *v1.ResourceQuota) { - n.deleteWeight(itemFromQuota(quota)) delete(n.QuotaStatus, quota.Name) } // Snapshot will clone a NamespaceInfo without Heap according NamespaceCollection func (n *NamespaceCollection) Snapshot() *NamespaceInfo { - var weight int64 = DefaultNamespaceWeight - - obj, err := n.quotaWeight.Pop() - if err != nil { - klog.Warningf("namespace %s, quota weight meets error %v when pop", n.Name, err) - } else { - item := obj.(*quotaItem) - weight = item.weight - n.quotaWeight.Add(item) - } - return &NamespaceInfo{ Name: NamespaceName(n.Name), - Weight: weight, QuotaStatus: n.QuotaStatus, } } diff --git a/pkg/scheduler/api/namespace_info_test.go b/pkg/scheduler/api/namespace_info_test.go index ddf1706697..3c4b7fac78 100644 --- a/pkg/scheduler/api/namespace_info_test.go +++ b/pkg/scheduler/api/namespace_info_test.go @@ -23,7 +23,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func newQuota(name string, weight, req int) *v1.ResourceQuota { +func newQuota(name string, req int) *v1.ResourceQuota { q := &v1.ResourceQuota{ TypeMeta: metav1.TypeMeta{}, ObjectMeta: metav1.ObjectMeta{ @@ -39,16 +39,12 @@ func newQuota(name string, weight, req int) *v1.ResourceQuota { }, } - if weight >= 0 { - q.Spec.Hard[v1.ResourceName(NamespaceWeightKey)] = *resource.NewQuantity(int64(weight), resource.DecimalSI) - } - return q } func TestNamespaceCollection(t *testing.T) { c := NewNamespaceCollection("testCollection") - c.Update(newQuota("abc", 123, 1)) + c.Update(newQuota("abc", 1)) info := c.Snapshot() req := info.QuotaStatus["abc"].Hard[v1.ResourceCPU] @@ -56,67 +52,25 @@ func TestNamespaceCollection(t *testing.T) { t.Errorf("cpu request of quota %s should be %d, but got %d", "abc", 1, req.Value()) } - c.Update(newQuota("abc", 456, 2)) - c.Update(newQuota("def", -1, 4)) - c.Update(newQuota("def", 16, 8)) - c.Update(newQuota("ghi", 0, 16)) + c.Update(newQuota("abc", 2)) + c.Update(newQuota("def", 4)) + c.Update(newQuota("def", 8)) + c.Update(newQuota("ghi", 16)) info = c.Snapshot() - if info.Weight != 456 { - t.Errorf("weight of namespace should be %d, but got %d", 456, info.Weight) - } req = info.QuotaStatus["abc"].Hard[v1.ResourceCPU] if req.Value() != 2 { t.Errorf("cpu request of quota %s should be %d, but got %d", "abc", 2, req.Value()) } - c.Delete(newQuota("abc", 0, 0)) + c.Delete(newQuota("abc", 0)) info = c.Snapshot() - if info.Weight != 16 { - t.Errorf("weight of namespace should be %d, but not %d", 16, info.Weight) - } if _, ok := info.QuotaStatus["abc"]; ok { t.Errorf("QuotaStatus abc of namespace should not exist") } - c.Delete(newQuota("abc", 0, 0)) - c.Delete(newQuota("def", 15, 0)) - c.Delete(newQuota("ghi", -1, 0)) - - info = c.Snapshot() - if info.Weight != DefaultNamespaceWeight { - t.Errorf("weight of namespace should be default weight %d, but not %d", DefaultNamespaceWeight, info.Weight) - } -} - -func TestEmptyNamespaceCollection(t *testing.T) { - c := NewNamespaceCollection("testEmptyCollection") - - info := c.Snapshot() - if info.Weight != DefaultNamespaceWeight { - t.Errorf("weight of namespace should be %d, but not %d", DefaultNamespaceWeight, info.Weight) - } - - // snapshot can be called anytime - info = c.Snapshot() - if info.Weight != DefaultNamespaceWeight { - t.Errorf("weight of namespace should be %d, but not %d", DefaultNamespaceWeight, info.Weight) - } - - c.Delete(newQuota("abc", 0, 0)) - - info = c.Snapshot() - if info.Weight != DefaultNamespaceWeight { - t.Errorf("weight of namespace should be %d, but not %d", DefaultNamespaceWeight, info.Weight) - } - - c.Delete(newQuota("abc", 0, 0)) - c.Delete(newQuota("def", 15, 0)) - c.Delete(newQuota("ghi", -1, 0)) - - info = c.Snapshot() - if info.Weight != DefaultNamespaceWeight { - t.Errorf("weight of namespace should be default weight %d, but not %d", DefaultNamespaceWeight, info.Weight) - } + c.Delete(newQuota("abc", 0)) + c.Delete(newQuota("def", 0)) + c.Delete(newQuota("ghi", 0)) } diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index bc2a718afc..fc4d028bb1 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -1099,8 +1099,6 @@ func (sc *SchedulerCache) Snapshot() *schedulingapi.ClusterInfo { for _, value := range sc.NamespaceCollection { info := value.Snapshot() snapshot.NamespaceInfo[info.Name] = info - klog.V(4).Infof("Namespace %s has weight %v", - value.Name, info.GetWeight()) } for _, value := range sc.Jobs { @@ -1161,8 +1159,7 @@ func (sc *SchedulerCache) String() string { str += "Namespaces:\n" for _, ns := range sc.NamespaceCollection { info := ns.Snapshot() - str += fmt.Sprintf("\t Namespace(%s) Weight(%v)\n", - info.Name, info.Weight) + str += fmt.Sprintf("\t Namespace(%s)\n", info.Name) } } diff --git a/pkg/scheduler/conf/scheduler_conf.go b/pkg/scheduler/conf/scheduler_conf.go index b2bd4d9062..ec34fb9df2 100644 --- a/pkg/scheduler/conf/scheduler_conf.go +++ b/pkg/scheduler/conf/scheduler_conf.go @@ -49,8 +49,6 @@ type PluginOption struct { Name string `yaml:"name"` // EnabledJobOrder defines whether jobOrderFn is enabled EnabledJobOrder *bool `yaml:"enableJobOrder"` - // EnabledNamespaceOrder defines whether namespaceOrderFn is enabled - EnabledNamespaceOrder *bool `yaml:"enableNamespaceOrder"` // EnabledHierachy defines whether hierarchical sharing is enabled EnabledHierarchy *bool `yaml:"enableHierarchy"` // EnabledJobReady defines whether jobReadyFn is enabled diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index 539bb2535f..bf4bdcc191 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -77,7 +77,6 @@ type Session struct { jobOrderFns map[string]api.CompareFn queueOrderFns map[string]api.CompareFn taskOrderFns map[string]api.CompareFn - namespaceOrderFns map[string]api.CompareFn clusterOrderFns map[string]api.CompareFn predicateFns map[string]api.PredicateFn prePredicateFns map[string]api.PrePredicateFn @@ -123,7 +122,6 @@ func openSession(cache cache.Cache) *Session { jobOrderFns: map[string]api.CompareFn{}, queueOrderFns: map[string]api.CompareFn{}, taskOrderFns: map[string]api.CompareFn{}, - namespaceOrderFns: map[string]api.CompareFn{}, clusterOrderFns: map[string]api.CompareFn{}, predicateFns: map[string]api.PredicateFn{}, prePredicateFns: map[string]api.PrePredicateFn{}, @@ -235,7 +233,6 @@ func closeSession(ssn *Session) { ssn.plugins = nil ssn.eventHandlers = nil ssn.jobOrderFns = nil - ssn.namespaceOrderFns = nil ssn.queueOrderFns = nil ssn.clusterOrderFns = nil ssn.NodeList = nil diff --git a/pkg/scheduler/framework/session_plugins.go b/pkg/scheduler/framework/session_plugins.go index 45a632b1f3..76a3a41c63 100644 --- a/pkg/scheduler/framework/session_plugins.go +++ b/pkg/scheduler/framework/session_plugins.go @@ -44,11 +44,6 @@ func (ssn *Session) AddTaskOrderFn(name string, cf api.CompareFn) { ssn.taskOrderFns[name] = cf } -// AddNamespaceOrderFn add namespace order function -func (ssn *Session) AddNamespaceOrderFn(name string, cf api.CompareFn) { - ssn.namespaceOrderFns[name] = cf -} - // AddPreemptableFn add preemptable function func (ssn *Session) AddPreemptableFn(name string, cf api.EvictableFn) { ssn.preemptableFns[name] = cf @@ -524,31 +519,6 @@ func (ssn *Session) JobOrderFn(l, r interface{}) bool { return lv.CreationTimestamp.Before(&rv.CreationTimestamp) } -// NamespaceOrderFn invoke namespaceorder function of the plugins -func (ssn *Session) NamespaceOrderFn(l, r interface{}) bool { - for _, tier := range ssn.Tiers { - for _, plugin := range tier.Plugins { - if !isEnabled(plugin.EnabledNamespaceOrder) { - continue - } - nof, found := ssn.namespaceOrderFns[plugin.Name] - if !found { - continue - } - if j := nof(l, r); j != 0 { - return j < 0 - } - } - } - - // TODO(lminzhw): if all NamespaceOrderFn treat these two namespace as the same, - // we should make the job order have its affect among namespaces. - // or just schedule namespace one by one - lv := l.(api.NamespaceName) - rv := r.(api.NamespaceName) - return lv < rv -} - // ClusterOrderFn invoke ClusterOrderFn function of the plugins func (ssn *Session) ClusterOrderFn(l, r interface{}) bool { for _, tier := range ssn.Tiers { diff --git a/pkg/scheduler/plugins/defaults.go b/pkg/scheduler/plugins/defaults.go index d41c2662a4..c9ca3fad1c 100644 --- a/pkg/scheduler/plugins/defaults.go +++ b/pkg/scheduler/plugins/defaults.go @@ -25,9 +25,6 @@ func ApplyPluginConfDefaults(option *conf.PluginOption) { if option.EnabledJobOrder == nil { option.EnabledJobOrder = &t } - if option.EnabledNamespaceOrder == nil { - option.EnabledNamespaceOrder = &t - } if option.EnabledJobReady == nil { option.EnabledJobReady = &t } diff --git a/pkg/scheduler/plugins/drf/drf.go b/pkg/scheduler/plugins/drf/drf.go index b8ea3973bb..164fa1cbbd 100644 --- a/pkg/scheduler/plugins/drf/drf.go +++ b/pkg/scheduler/plugins/drf/drf.go @@ -152,19 +152,6 @@ func (drf *drfPlugin) HierarchyEnabled(ssn *framework.Session) bool { return false } -// NamespaceOrderEnabled returns the NamespaceOrder for this plugin is enabled in this session or not -func (drf *drfPlugin) NamespaceOrderEnabled(ssn *framework.Session) bool { - for _, tier := range ssn.Tiers { - for _, plugin := range tier.Plugins { - if plugin.Name != PluginName { - continue - } - return plugin.EnabledNamespaceOrder != nil && *plugin.EnabledNamespaceOrder - } - } - return false -} - func (drf *drfPlugin) compareQueues(root *hierarchicalNode, lqueue *api.QueueInfo, rqueue *api.QueueInfo) float64 { lnode := root lpaths := strings.Split(lqueue.Hierarchy, "/") @@ -203,7 +190,6 @@ func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) { klog.V(4).Infof("Total Allocatable %s", drf.totalResource) - namespaceOrderEnabled := drf.NamespaceOrderEnabled(ssn) hierarchyEnabled := drf.HierarchyEnabled(ssn) for _, job := range ssn.Jobs { @@ -224,18 +210,6 @@ func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) { drf.jobAttrs[job.UID] = attr - if namespaceOrderEnabled { - nsOpts, found := drf.namespaceOpts[job.Namespace] - if !found { - nsOpts = &drfAttr{ - allocated: api.EmptyResource(), - } - drf.namespaceOpts[job.Namespace] = nsOpts - } - // all task in job should have the same namespace with job - nsOpts.allocated.Add(attr.allocated) - drf.updateNamespaceShare(job.Namespace, nsOpts) - } if hierarchyEnabled { queue := ssn.Queues[job.Queue] drf.totalAllocated.Add(attr.allocated) @@ -250,57 +224,6 @@ func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) { victims = append(victims, candidate) } - if namespaceOrderEnabled { - // apply the namespace share policy on preemptee firstly - - lWeight := ssn.NamespaceInfo[api.NamespaceName(preemptor.Namespace)].GetWeight() - lNsAtt := drf.namespaceOpts[preemptor.Namespace] - lNsAlloc := lNsAtt.allocated.Clone().Add(preemptor.Resreq) - _, lNsShare := drf.calculateShare(lNsAlloc, drf.totalResource) - lNsShareWeighted := lNsShare / float64(lWeight) - - namespaceAllocation := map[string]*api.Resource{} - - // undecidedPreemptees means this policy could not judge preemptee is preemptable or not - // and left it to next policy - undecidedPreemptees := []*api.TaskInfo{} - - for _, preemptee := range preemptees { - if preemptor.Namespace == preemptee.Namespace { - // policy is disabled when they are in the same namespace - undecidedPreemptees = append(undecidedPreemptees, preemptee) - continue - } - - // compute the preemptee namespace weighted share after preemption - nsAllocation, found := namespaceAllocation[preemptee.Namespace] - if !found { - rNsAtt := drf.namespaceOpts[preemptee.Namespace] - nsAllocation = rNsAtt.allocated.Clone() - namespaceAllocation[preemptee.Namespace] = nsAllocation - } - rWeight := ssn.NamespaceInfo[api.NamespaceName(preemptee.Namespace)].GetWeight() - rNsAlloc := nsAllocation.Sub(preemptee.Resreq) - _, rNsShare := drf.calculateShare(rNsAlloc, drf.totalResource) - rNsShareWeighted := rNsShare / float64(rWeight) - - // to avoid ping pong actions, the preemptee namespace should - // have the higher weighted share after preemption. - if lNsShareWeighted < rNsShareWeighted { - addVictim(preemptee) - continue - } - if lNsShareWeighted-rNsShareWeighted > shareDelta { - continue - } - - // equal namespace order leads to judgement of jobOrder - undecidedPreemptees = append(undecidedPreemptees, preemptee) - } - - preemptees = undecidedPreemptees - } - latt := drf.jobAttrs[preemptor.Job] lalloc := latt.allocated.Clone().Add(preemptor.Resreq) _, ls := drf.calculateShare(lalloc, drf.totalResource) @@ -421,42 +344,6 @@ func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) { ssn.AddJobOrderFn(drf.Name(), jobOrderFn) - namespaceOrderFn := func(l interface{}, r interface{}) int { - lv := l.(api.NamespaceName) - rv := r.(api.NamespaceName) - - lOpt := drf.namespaceOpts[string(lv)] - rOpt := drf.namespaceOpts[string(rv)] - - lWeight := ssn.NamespaceInfo[lv].GetWeight() - rWeight := ssn.NamespaceInfo[rv].GetWeight() - - klog.V(4).Infof("DRF NamespaceOrderFn: <%v> share state: %f, weight %v, <%v> share state: %f, weight %v", - lv, lOpt.share, lWeight, rv, rOpt.share, rWeight) - - lWeightedShare := lOpt.share / float64(lWeight) - rWeightedShare := rOpt.share / float64(rWeight) - - metrics.UpdateNamespaceWeight(string(lv), lWeight) - metrics.UpdateNamespaceWeight(string(rv), rWeight) - metrics.UpdateNamespaceWeightedShare(string(lv), lWeightedShare) - metrics.UpdateNamespaceWeightedShare(string(rv), rWeightedShare) - - if lWeightedShare == rWeightedShare { - return 0 - } - - if lWeightedShare < rWeightedShare { - return -1 - } - - return 1 - } - - if namespaceOrderEnabled { - ssn.AddNamespaceOrderFn(drf.Name(), namespaceOrderFn) - } - // Register event handlers. ssn.AddEventHandler(&framework.EventHandler{ AllocateFunc: func(event *framework.Event) { @@ -467,13 +354,6 @@ func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) { drf.updateJobShare(job.Namespace, job.Name, attr) nsShare := -1.0 - if namespaceOrderEnabled { - nsOpt := drf.namespaceOpts[event.Task.Namespace] - nsOpt.allocated.Add(event.Task.Resreq) - - drf.updateNamespaceShare(event.Task.Namespace, nsOpt) - nsShare = nsOpt.share - } if hierarchyEnabled { queue := ssn.Queues[job.Queue] @@ -492,13 +372,6 @@ func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) { drf.updateJobShare(job.Namespace, job.Name, attr) nsShare := -1.0 - if namespaceOrderEnabled { - nsOpt := drf.namespaceOpts[event.Task.Namespace] - nsOpt.allocated.Sub(event.Task.Resreq) - - drf.updateNamespaceShare(event.Task.Namespace, nsOpt) - nsShare = nsOpt.share - } if hierarchyEnabled { queue := ssn.Queues[job.Queue] diff --git a/pkg/scheduler/util_test.go b/pkg/scheduler/util_test.go index 732bbea06e..c8943a3ad3 100644 --- a/pkg/scheduler/util_test.go +++ b/pkg/scheduler/util_test.go @@ -44,155 +44,148 @@ tiers: { Plugins: []conf.PluginOption{ { - Name: "priority", - EnabledJobOrder: &trueValue, - EnabledNamespaceOrder: &trueValue, - EnabledJobReady: &trueValue, - EnabledJobPipelined: &trueValue, - EnabledTaskOrder: &trueValue, - EnabledPreemptable: &trueValue, - EnabledReclaimable: &trueValue, - EnabledQueueOrder: &trueValue, - EnabledPredicate: &trueValue, - EnabledBestNode: &trueValue, - EnabledNodeOrder: &trueValue, - EnabledTargetJob: &trueValue, - EnabledReservedNodes: &trueValue, - EnabledJobEnqueued: &trueValue, - EnabledVictim: &trueValue, - EnabledJobStarving: &trueValue, - EnabledOverused: &trueValue, - EnabledAllocatable: &trueValue, + Name: "priority", + EnabledJobOrder: &trueValue, + EnabledJobReady: &trueValue, + EnabledJobPipelined: &trueValue, + EnabledTaskOrder: &trueValue, + EnabledPreemptable: &trueValue, + EnabledReclaimable: &trueValue, + EnabledQueueOrder: &trueValue, + EnabledPredicate: &trueValue, + EnabledBestNode: &trueValue, + EnabledNodeOrder: &trueValue, + EnabledTargetJob: &trueValue, + EnabledReservedNodes: &trueValue, + EnabledJobEnqueued: &trueValue, + EnabledVictim: &trueValue, + EnabledJobStarving: &trueValue, + EnabledOverused: &trueValue, + EnabledAllocatable: &trueValue, }, { - Name: "gang", - EnabledJobOrder: &trueValue, - EnabledNamespaceOrder: &trueValue, - EnabledJobReady: &trueValue, - EnabledJobPipelined: &trueValue, - EnabledTaskOrder: &trueValue, - EnabledPreemptable: &trueValue, - EnabledReclaimable: &trueValue, - EnabledQueueOrder: &trueValue, - EnabledPredicate: &trueValue, - EnabledBestNode: &trueValue, - EnabledNodeOrder: &trueValue, - EnabledTargetJob: &trueValue, - EnabledReservedNodes: &trueValue, - EnabledJobEnqueued: &trueValue, - EnabledVictim: &trueValue, - EnabledJobStarving: &trueValue, - EnabledOverused: &trueValue, - EnabledAllocatable: &trueValue, + Name: "gang", + EnabledJobOrder: &trueValue, + EnabledJobReady: &trueValue, + EnabledJobPipelined: &trueValue, + EnabledTaskOrder: &trueValue, + EnabledPreemptable: &trueValue, + EnabledReclaimable: &trueValue, + EnabledQueueOrder: &trueValue, + EnabledPredicate: &trueValue, + EnabledBestNode: &trueValue, + EnabledNodeOrder: &trueValue, + EnabledTargetJob: &trueValue, + EnabledReservedNodes: &trueValue, + EnabledJobEnqueued: &trueValue, + EnabledVictim: &trueValue, + EnabledJobStarving: &trueValue, + EnabledOverused: &trueValue, + EnabledAllocatable: &trueValue, }, { - Name: "conformance", - EnabledJobOrder: &trueValue, - EnabledNamespaceOrder: &trueValue, - EnabledJobReady: &trueValue, - EnabledJobPipelined: &trueValue, - EnabledTaskOrder: &trueValue, - EnabledPreemptable: &trueValue, - EnabledReclaimable: &trueValue, - EnabledQueueOrder: &trueValue, - EnabledPredicate: &trueValue, - EnabledBestNode: &trueValue, - EnabledNodeOrder: &trueValue, - EnabledTargetJob: &trueValue, - EnabledReservedNodes: &trueValue, - EnabledJobEnqueued: &trueValue, - EnabledVictim: &trueValue, - EnabledJobStarving: &trueValue, - EnabledOverused: &trueValue, - EnabledAllocatable: &trueValue, + Name: "conformance", + EnabledJobOrder: &trueValue, + EnabledJobReady: &trueValue, + EnabledJobPipelined: &trueValue, + EnabledTaskOrder: &trueValue, + EnabledPreemptable: &trueValue, + EnabledReclaimable: &trueValue, + EnabledQueueOrder: &trueValue, + EnabledPredicate: &trueValue, + EnabledBestNode: &trueValue, + EnabledNodeOrder: &trueValue, + EnabledTargetJob: &trueValue, + EnabledReservedNodes: &trueValue, + EnabledJobEnqueued: &trueValue, + EnabledVictim: &trueValue, + EnabledJobStarving: &trueValue, + EnabledOverused: &trueValue, + EnabledAllocatable: &trueValue, }, }, }, { Plugins: []conf.PluginOption{ { - Name: "drf", - EnabledJobOrder: &trueValue, - EnabledNamespaceOrder: &trueValue, - EnabledJobReady: &trueValue, - EnabledJobPipelined: &trueValue, - EnabledTaskOrder: &trueValue, - EnabledPreemptable: &trueValue, - EnabledReclaimable: &trueValue, - EnabledQueueOrder: &trueValue, - EnabledPredicate: &trueValue, - EnabledBestNode: &trueValue, - EnabledNodeOrder: &trueValue, - EnabledTargetJob: &trueValue, - EnabledReservedNodes: &trueValue, - EnabledJobEnqueued: &trueValue, - EnabledVictim: &trueValue, - EnabledJobStarving: &trueValue, - EnabledOverused: &trueValue, - EnabledAllocatable: &trueValue, + Name: "drf", + EnabledJobOrder: &trueValue, + EnabledJobReady: &trueValue, + EnabledJobPipelined: &trueValue, + EnabledTaskOrder: &trueValue, + EnabledPreemptable: &trueValue, + EnabledReclaimable: &trueValue, + EnabledQueueOrder: &trueValue, + EnabledPredicate: &trueValue, + EnabledBestNode: &trueValue, + EnabledNodeOrder: &trueValue, + EnabledTargetJob: &trueValue, + EnabledReservedNodes: &trueValue, + EnabledJobEnqueued: &trueValue, + EnabledVictim: &trueValue, + EnabledJobStarving: &trueValue, + EnabledOverused: &trueValue, + EnabledAllocatable: &trueValue, }, { - Name: "predicates", - EnabledJobOrder: &trueValue, - EnabledNamespaceOrder: &trueValue, - EnabledJobReady: &trueValue, - EnabledJobPipelined: &trueValue, - EnabledTaskOrder: &trueValue, - EnabledPreemptable: &trueValue, - EnabledReclaimable: &trueValue, - EnabledQueueOrder: &trueValue, - EnabledPredicate: &trueValue, - EnabledBestNode: &trueValue, - EnabledNodeOrder: &trueValue, - EnabledTargetJob: &trueValue, - EnabledReservedNodes: &trueValue, - EnabledJobEnqueued: &trueValue, - EnabledVictim: &trueValue, - EnabledJobStarving: &trueValue, - EnabledOverused: &trueValue, - EnabledAllocatable: &trueValue, + Name: "predicates", + EnabledJobOrder: &trueValue, + EnabledJobReady: &trueValue, + EnabledJobPipelined: &trueValue, + EnabledTaskOrder: &trueValue, + EnabledPreemptable: &trueValue, + EnabledReclaimable: &trueValue, + EnabledQueueOrder: &trueValue, + EnabledPredicate: &trueValue, + EnabledBestNode: &trueValue, + EnabledNodeOrder: &trueValue, + EnabledTargetJob: &trueValue, + EnabledReservedNodes: &trueValue, + EnabledJobEnqueued: &trueValue, + EnabledVictim: &trueValue, + EnabledJobStarving: &trueValue, + EnabledOverused: &trueValue, + EnabledAllocatable: &trueValue, }, { - Name: "proportion", - EnabledJobOrder: &trueValue, - EnabledNamespaceOrder: &trueValue, - EnabledJobReady: &trueValue, - EnabledJobPipelined: &trueValue, - EnabledTaskOrder: &trueValue, - EnabledPreemptable: &trueValue, - EnabledReclaimable: &trueValue, - EnabledQueueOrder: &trueValue, - EnabledPredicate: &trueValue, - EnabledBestNode: &trueValue, - EnabledNodeOrder: &trueValue, - EnabledTargetJob: &trueValue, - EnabledReservedNodes: &trueValue, - EnabledJobEnqueued: &trueValue, - EnabledVictim: &trueValue, - EnabledJobStarving: &trueValue, - EnabledOverused: &trueValue, - EnabledAllocatable: &trueValue, + Name: "proportion", + EnabledJobOrder: &trueValue, + EnabledJobReady: &trueValue, + EnabledJobPipelined: &trueValue, + EnabledTaskOrder: &trueValue, + EnabledPreemptable: &trueValue, + EnabledReclaimable: &trueValue, + EnabledQueueOrder: &trueValue, + EnabledPredicate: &trueValue, + EnabledBestNode: &trueValue, + EnabledNodeOrder: &trueValue, + EnabledTargetJob: &trueValue, + EnabledReservedNodes: &trueValue, + EnabledJobEnqueued: &trueValue, + EnabledVictim: &trueValue, + EnabledJobStarving: &trueValue, + EnabledOverused: &trueValue, + EnabledAllocatable: &trueValue, }, { - Name: "nodeorder", - EnabledJobOrder: &trueValue, - EnabledNamespaceOrder: &trueValue, - EnabledJobReady: &trueValue, - EnabledJobPipelined: &trueValue, - EnabledTaskOrder: &trueValue, - EnabledPreemptable: &trueValue, - EnabledReclaimable: &trueValue, - EnabledQueueOrder: &trueValue, - EnabledPredicate: &trueValue, - EnabledBestNode: &trueValue, - EnabledNodeOrder: &trueValue, - EnabledTargetJob: &trueValue, - EnabledReservedNodes: &trueValue, - EnabledJobEnqueued: &trueValue, - EnabledVictim: &trueValue, - EnabledJobStarving: &trueValue, - EnabledOverused: &trueValue, - EnabledAllocatable: &trueValue, + Name: "nodeorder", + EnabledJobOrder: &trueValue, + EnabledJobReady: &trueValue, + EnabledJobPipelined: &trueValue, + EnabledTaskOrder: &trueValue, + EnabledPreemptable: &trueValue, + EnabledReclaimable: &trueValue, + EnabledQueueOrder: &trueValue, + EnabledPredicate: &trueValue, + EnabledBestNode: &trueValue, + EnabledNodeOrder: &trueValue, + EnabledTargetJob: &trueValue, + EnabledReservedNodes: &trueValue, + EnabledJobEnqueued: &trueValue, + EnabledVictim: &trueValue, + EnabledJobStarving: &trueValue, + EnabledOverused: &trueValue, + EnabledAllocatable: &trueValue, }, }, }, diff --git a/test/e2e/schedulingbase/job_scheduling.go b/test/e2e/schedulingbase/job_scheduling.go index f2c0fc896b..5cb1ecb07f 100644 --- a/test/e2e/schedulingbase/job_scheduling.go +++ b/test/e2e/schedulingbase/job_scheduling.go @@ -442,120 +442,6 @@ var _ = Describe("Job E2E Test", func() { Expect(err).NotTo(HaveOccurred()) }) - It("Namespace Fair Share", func() { - Skip("Failed when add yaml and test case may fail in some condition") - ctx := e2eutil.InitTestContext(e2eutil.Options{}) - defer e2eutil.CleanupTestContext(ctx) - const fairShareNamespace = "fairshare" - _, err := ctx.Kubeclient.CoreV1().Namespaces().Create(context.TODO(), - &v1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: fairShareNamespace, - }, - }, - metav1.CreateOptions{}) - Expect(err).NotTo(HaveOccurred()) - defer func() { - deleteForeground := metav1.DeletePropagationForeground - err := ctx.Kubeclient.CoreV1().Namespaces().Delete(context.TODO(), - fairShareNamespace, - metav1.DeleteOptions{ - PropagationPolicy: &deleteForeground, - }) - Expect(err).NotTo(HaveOccurred()) - - err = wait.Poll(100*time.Millisecond, e2eutil.FiveMinute, e2eutil.NamespaceNotExistWithName(ctx, fairShareNamespace)) - Expect(err).NotTo(HaveOccurred()) - }() - - slot := e2eutil.HalfCPU - rep := e2eutil.ClusterSize(ctx, slot) - - createJobToNamespace := func(namespace string, index int, replica int32) *vcbatch.Job { - spec := &e2eutil.JobSpec{ - Name: fmt.Sprintf("namespace-fair-share-%s-%d", namespace, index), - Namespace: namespace, - Tasks: []e2eutil.TaskSpec{ - { - Img: e2eutil.DefaultNginxImage, - Command: "sleep 10000", - Req: slot, - Min: 2, - Rep: replica, - }, - }, - } - job := e2eutil.CreateJob(ctx, spec) - return job - } - - By("occupy all cluster resources") - occupiedJob := createJobToNamespace("default", 123, rep*2) - err = e2eutil.WaitJobReady(ctx, occupiedJob) - Expect(err).NotTo(HaveOccurred()) - - for i := 0; i < int(rep); i++ { - createJobToNamespace(ctx.Namespace, i, 2) - createJobToNamespace(fairShareNamespace, i, 2) - } - - By("release occupied cluster resources") - deleteBackground := metav1.DeletePropagationBackground - err = ctx.Vcclient.BatchV1alpha1().Jobs(occupiedJob.Namespace).Delete(context.TODO(), - occupiedJob.Name, - metav1.DeleteOptions{ - PropagationPolicy: &deleteBackground, - }) - Expect(err).NotTo(HaveOccurred()) - - By("wait occupied cluster resources releasing") - err = e2eutil.WaitJobCleanedUp(ctx, occupiedJob) - Expect(err).NotTo(HaveOccurred()) - - By("wait pod in fs/test namespace scheduled") - fsScheduledPod := 0 - testScheduledPod := 0 - expectPod := int(rep) - if expectPod%1 == 1 { - expectPod-- - } - err = wait.Poll(100*time.Millisecond, e2eutil.FiveMinute, func() (bool, error) { - fsScheduledPod = 0 - testScheduledPod = 0 - - pods, err := ctx.Kubeclient.CoreV1().Pods(fairShareNamespace).List(context.TODO(), metav1.ListOptions{}) - if err != nil { - return false, err - } - for _, pod := range pods.Items { - if e2eutil.IsPodScheduled(&pod) { - fsScheduledPod++ - } - } - - pods, err = ctx.Kubeclient.CoreV1().Pods(ctx.Namespace).List(context.TODO(), metav1.ListOptions{}) - if err != nil { - return false, err - } - for _, pod := range pods.Items { - if e2eutil.IsPodScheduled(&pod) { - testScheduledPod++ - } - } - - // gang scheduling - if testScheduledPod+fsScheduledPod == expectPod { - return true, nil - } - - return false, nil - }) - Expect(err).NotTo(HaveOccurred()) - - Expect(testScheduledPod).Should(BeNumerically(">=", expectPod/2-1), fmt.Sprintf("expectPod %d, fsScheduledPod %d, testScheduledPod %d", expectPod, fsScheduledPod, testScheduledPod)) - Expect(testScheduledPod).Should(BeNumerically("<=", expectPod/2+1), fmt.Sprintf("expectPod %d, fsScheduledPod %d, testScheduledPod %d", expectPod, fsScheduledPod, testScheduledPod)) - }) - It("Queue Fair Share", func() { Skip("Failed when add yaml, test case may fail in some condition") q1, q2 := "q1", "q2" diff --git a/test/e2e/util/configmap.go b/test/e2e/util/configmap.go index 52743dbddf..fb9f096696 100644 --- a/test/e2e/util/configmap.go +++ b/test/e2e/util/configmap.go @@ -130,8 +130,6 @@ type PluginOption struct { Name string `yaml:"name"` // EnabledJobOrder defines whether jobOrderFn is enabled EnabledJobOrder *bool `yaml:"enableJobOrder,omitempty"` - // EnabledNamespaceOrder defines whether namespaceOrderFn is enabled - EnabledNamespaceOrder *bool `yaml:"enableNamespaceOrder,omitempty"` // EnabledHierachy defines whether hierarchical sharing is enabled EnabledHierarchy *bool `yaml:"enableHierarchy,omitempty"` // EnabledJobReady defines whether jobReadyFn is enabled