diff --git a/installer/helm/chart/volcano/templates/scheduler.yaml b/installer/helm/chart/volcano/templates/scheduler.yaml index 5a0e53f210..26777ef74c 100644 --- a/installer/helm/chart/volcano/templates/scheduler.yaml +++ b/installer/helm/chart/volcano/templates/scheduler.yaml @@ -48,6 +48,9 @@ rules: - apiGroups: [""] resources: ["namespaces"] verbs: ["list", "watch"] + - apiGroups: [""] + resources: ["resourcequotas"] + verbs: ["list", "watch"] - apiGroups: ["storage.k8s.io"] resources: ["storageclasses"] verbs: ["list", "watch"] diff --git a/installer/volcano-development.yaml b/installer/volcano-development.yaml index a4c7eed6ee..346ad529f9 100644 --- a/installer/volcano-development.yaml +++ b/installer/volcano-development.yaml @@ -67,6 +67,9 @@ rules: - apiGroups: [""] resources: ["namespaces"] verbs: ["list", "watch"] + - apiGroups: [""] + resources: ["resourcequotas"] + verbs: ["list", "watch"] - apiGroups: ["storage.k8s.io"] resources: ["storageclasses"] verbs: ["list", "watch"] diff --git a/pkg/apis/scheduling/v1alpha1/zz_generated.conversion.go b/pkg/apis/scheduling/v1alpha1/zz_generated.conversion.go index 447f3f2af9..67690e9ea1 100644 --- a/pkg/apis/scheduling/v1alpha1/zz_generated.conversion.go +++ b/pkg/apis/scheduling/v1alpha1/zz_generated.conversion.go @@ -126,6 +126,11 @@ func RegisterConversions(s *runtime.Scheme) error { }); err != nil { return err } + if err := s.AddConversionFunc((*scheduling.QueueStatus)(nil), (*QueueStatus)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_scheduling_QueueStatus_To_v1alpha1_QueueStatus(a.(*scheduling.QueueStatus), b.(*QueueStatus), scope) + }); err != nil { + return err + } return nil } diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index eb0eed3310..80a1ee13e2 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -42,9 +42,20 @@ func (alloc *allocateAction) Initialize() {} func (alloc *allocateAction) Execute(ssn *framework.Session) { glog.V(3).Infof("Enter Allocate ...") defer glog.V(3).Infof("Leaving Allocate ...") - // further - queues := util.NewPriorityQueue(ssn.QueueOrderFn) - jobsMap := map[api.QueueID]*util.PriorityQueue{} + + // the allocation for pod may have many stages + // 1. pick a namespace named N (using ssn.NamespaceOrderFn) + // 2. pick a queue named Q from N (using ssn.QueueOrderFn) + // 3. pick a job named J from Q (using ssn.JobOrderFn) + // 4. pick a task T from J (using ssn.TaskOrderFn) + // 5. use predicateFn to filter out node that T can not be allocated on. + // 6. use ssn.NodeOrderFn to judge the best node and assign it to T + + namespaces := util.NewPriorityQueue(ssn.NamespaceOrderFn) + + // jobsMap is map[api.NamespaceName]map[api.QueueID]PriorityQueue(*api.JobInfo) + // used to find job with highest priority in given queue and namespace + jobsMap := map[api.NamespaceName]map[api.QueueID]*util.PriorityQueue{} for _, job := range ssn.Jobs { if job.PodGroup.Status.Phase == scheduling.PodGroupPending { @@ -55,23 +66,32 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) { continue } - if queue, found := ssn.Queues[job.Queue]; found { - queues.Push(queue) - } else { + if _, found := ssn.Queues[job.Queue]; !found { glog.Warningf("Skip adding Job <%s/%s> because its queue %s is not found", job.Namespace, job.Name, job.Queue) continue } - if _, found := jobsMap[job.Queue]; !found { - jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn) + namespace := api.NamespaceName(job.Namespace) + queueMap, found := jobsMap[namespace] + if !found { + namespaces.Push(namespace) + + queueMap = make(map[api.QueueID]*util.PriorityQueue) + jobsMap[namespace] = queueMap + } + + jobs, found := queueMap[job.Queue] + if !found { + jobs = util.NewPriorityQueue(ssn.JobOrderFn) + queueMap[job.Queue] = jobs } glog.V(4).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue) - jobsMap[job.Queue].Push(job) + jobs.Push(job) } - glog.V(3).Infof("Try to allocate resource to %d Queues", len(jobsMap)) + glog.V(3).Infof("Try to allocate resource to %d Namespaces", len(jobsMap)) pendingTasks := map[api.JobID]*util.PriorityQueue{} @@ -92,21 +112,47 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) { return ssn.PredicateFn(task, node) } + // To pick tuple for job, we choose to pick namespace firstly. + // Because we believe that number of queues would less than namespaces in most case. + // And, this action would make the resource usage among namespace balanced. for { - if queues.Empty() { + if namespaces.Empty() { break } - queue := queues.Pop().(*api.QueueInfo) - if ssn.Overused(queue) { - glog.V(3).Infof("Queue <%s> is overused, ignore it.", queue.Name) - continue + // pick namespace from namespaces PriorityQueue + namespace := namespaces.Pop().(api.NamespaceName) + + queueInNamespace := jobsMap[namespace] + + // pick queue for given namespace + // + // This block use a algorithm with time complex O(n). + // But at least PriorityQueue could not be used here, + // because the allocation of job would change the priority of queue among all namespaces, + // and the PriorityQueue have no ability to update priority for a special queue. + var queue *api.QueueInfo + for queueId := range queueInNamespace { + currentQueue := ssn.Queues[queueId] + if ssn.Overused(currentQueue) { + glog.V(3).Infof("Namespace <%s> Queue <%s> is overused, ignore it.", namespace, currentQueue.Name) + delete(queueInNamespace, queueId) + continue + } + + if queue == nil || ssn.QueueOrderFn(currentQueue, queue) { + queue = currentQueue + } } - jobs, found := jobsMap[queue.UID] + if queue == nil { + glog.V(3).Infof("Namespace <%s> have no queue, skip it", namespace) + continue + } - glog.V(3).Infof("Try to allocate resource to Jobs in Queue <%v>", queue.Name) + glog.V(3).Infof("Try to allocate resource to Jobs in Namespace <%s> Queue <%v>", namespace, queue.Name) + jobs, found := queueInNamespace[queue.UID] if !found || jobs.Empty() { glog.V(4).Infof("Can not find jobs for queue %s.", queue.Name) continue @@ -194,8 +240,9 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) { } else { stmt.Discard() } - // Added Queue back until no job in Queue. - queues.Push(queue) + + // Added Namespace back until no job in Namespace. + namespaces.Push(namespace) } } diff --git a/pkg/scheduler/actions/allocate/allocate_test.go b/pkg/scheduler/actions/allocate/allocate_test.go index 7b2a50cf4f..8b64cfc018 100644 --- a/pkg/scheduler/actions/allocate/allocate_test.go +++ b/pkg/scheduler/actions/allocate/allocate_test.go @@ -182,9 +182,10 @@ func TestAllocate(t *testing.T) { { Plugins: []conf.PluginOption{ { - Name: "drf", - EnabledPreemptable: &trueValue, - EnabledJobOrder: &trueValue, + Name: "drf", + EnabledPreemptable: &trueValue, + EnabledJobOrder: &trueValue, + EnabledNamespaceOrder: &trueValue, }, { Name: "proportion", diff --git a/pkg/scheduler/api/cluster_info.go b/pkg/scheduler/api/cluster_info.go index 40f9b9f6bc..f3925c0550 100644 --- a/pkg/scheduler/api/cluster_info.go +++ b/pkg/scheduler/api/cluster_info.go @@ -20,9 +20,10 @@ import "fmt" // ClusterInfo is a snapshot of cluster by cache. type ClusterInfo struct { - Jobs map[JobID]*JobInfo - Nodes map[string]*NodeInfo - Queues map[QueueID]*QueueInfo + Jobs map[JobID]*JobInfo + Nodes map[string]*NodeInfo + Queues map[QueueID]*QueueInfo + NamespaceInfo map[NamespaceName]*NamespaceInfo } func (ci ClusterInfo) String() string { @@ -57,5 +58,13 @@ func (ci ClusterInfo) String() string { } } + if len(ci.NamespaceInfo) != 0 { + str = str + "Namespaces:\n" + for _, ns := range ci.NamespaceInfo { + str = str + fmt.Sprintf("\t Namespace(%s) Weight(%v)\n", + ns.Name, ns.Weight) + } + } + return str } diff --git a/pkg/scheduler/api/namespace_info.go b/pkg/scheduler/api/namespace_info.go new file mode 100644 index 0000000000..80ce552a19 --- /dev/null +++ b/pkg/scheduler/api/namespace_info.go @@ -0,0 +1,146 @@ +/* +Copyright 2018 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package api + +import ( + "fmt" + + "github.com/golang/glog" + + v1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/cache" +) + +// NamespaceName is name of namespace +type NamespaceName string + +const ( + // NamespaceWeightKey is the key in ResourceQuota.spec.hard indicating the weight of this namespace + NamespaceWeightKey = "volcano.sh/namespace.weight" + // DefaultNamespaceWeight is the default weight of namespace + DefaultNamespaceWeight = 1 +) + +// NamespaceInfo records information of namespace +type NamespaceInfo struct { + // Name is the name of this namespace + Name NamespaceName + // Weight is the highest weight among many ResourceQuota. + Weight int64 +} + +// GetWeight returns weight of a namespace, any invalid case would get default value +func (n *NamespaceInfo) GetWeight() int64 { + if n == nil || n.Weight == 0 { + return DefaultNamespaceWeight + } + return n.Weight +} + +type quotaItem struct { + name string + weight int64 +} + +func quotaItemKeyFunc(obj interface{}) (string, error) { + item, ok := obj.(*quotaItem) + if !ok { + return "", fmt.Errorf("obj with type %T could not parse", obj) + } + return item.name, nil +} + +// for big root heap +func quotaItemLessFunc(a interface{}, b interface{}) bool { + A := a.(*quotaItem) + B := b.(*quotaItem) + return A.weight > B.weight +} + +// NamespaceCollection will record all details about namespace +type NamespaceCollection struct { + Name string + + quotaWeight *cache.Heap +} + +// NewNamespaceCollection creates new NamespaceCollection object to record all information about a namespace +func NewNamespaceCollection(name string) *NamespaceCollection { + n := &NamespaceCollection{ + Name: name, + quotaWeight: cache.NewHeap(quotaItemKeyFunc, quotaItemLessFunc), + } + // add at least one item into quotaWeight. + // Because cache.Heap.Pop would be blocked until queue is not empty + n.updateWeight("aItem{ + name: NamespaceWeightKey, + weight: DefaultNamespaceWeight, + }) + return n +} + +func (n *NamespaceCollection) deleteWeight(q *quotaItem) { + n.quotaWeight.Delete(q) +} + +func (n *NamespaceCollection) updateWeight(q *quotaItem) { + n.quotaWeight.Update(q) +} + +func itemFromQuota(quota *v1.ResourceQuota) *quotaItem { + var weight int64 = DefaultNamespaceWeight + + quotaWeight, ok := quota.Spec.Hard[NamespaceWeightKey] + if ok { + weight = quotaWeight.Value() + } + + item := "aItem{ + name: quota.Name, + weight: weight, + } + return item +} + +// Update modify the registered information according quota object +func (n *NamespaceCollection) Update(quota *v1.ResourceQuota) { + n.updateWeight(itemFromQuota(quota)) +} + +// Delete remove the registered information according quota object +func (n *NamespaceCollection) Delete(quota *v1.ResourceQuota) { + n.deleteWeight(itemFromQuota(quota)) +} + +// Snapshot will clone a NamespaceInfo without Heap according NamespaceCollection +func (n *NamespaceCollection) Snapshot() *NamespaceInfo { + var weight int64 = DefaultNamespaceWeight + + obj, err := n.quotaWeight.Pop() + if err != nil { + glog.Warningf("namespace %s, quota weight meets error %v when pop", n.Name, err) + } else { + item := obj.(*quotaItem) + weight = item.weight + n.quotaWeight.Add(item) + } + + return &NamespaceInfo{ + Name: NamespaceName(n.Name), + Weight: weight, + } +} diff --git a/pkg/scheduler/api/namespace_info_test.go b/pkg/scheduler/api/namespace_info_test.go new file mode 100644 index 0000000000..5f5b05adbd --- /dev/null +++ b/pkg/scheduler/api/namespace_info_test.go @@ -0,0 +1,103 @@ +/* +Copyright 2018 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package api + +import ( + "testing" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func newQuota(name string, weight int) *v1.ResourceQuota { + q := &v1.ResourceQuota{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: v1.ResourceQuotaSpec{ + Hard: make(v1.ResourceList), + }, + } + + if weight >= 0 { + q.Spec.Hard[v1.ResourceName(NamespaceWeightKey)] = *resource.NewQuantity(int64(weight), resource.DecimalSI) + } + + return q +} + +func TestNamespaceCollection(t *testing.T) { + c := NewNamespaceCollection("testCollection") + c.Update(newQuota("abc", 123)) + c.Update(newQuota("abc", 456)) + c.Update(newQuota("def", -1)) + c.Update(newQuota("def", 16)) + c.Update(newQuota("ghi", 0)) + + info := c.Snapshot() + if info.Weight != 456 { + t.Errorf("weight of namespace should be %d, but not %d", 456, info.Weight) + } + + c.Delete(newQuota("abc", 0)) + + info = c.Snapshot() + if info.Weight != 16 { + t.Errorf("weight of namespace should be %d, but not %d", 16, info.Weight) + } + + c.Delete(newQuota("abc", 0)) + c.Delete(newQuota("def", 15)) + c.Delete(newQuota("ghi", -1)) + + info = c.Snapshot() + if info.Weight != DefaultNamespaceWeight { + t.Errorf("weight of namespace should be default weight %d, but not %d", DefaultNamespaceWeight, info.Weight) + } +} + +func TestEmptyNamespaceCollection(t *testing.T) { + c := NewNamespaceCollection("testEmptyCollection") + + info := c.Snapshot() + if info.Weight != DefaultNamespaceWeight { + t.Errorf("weight of namespace should be %d, but not %d", DefaultNamespaceWeight, info.Weight) + } + + // snapshot can be called anytime + info = c.Snapshot() + if info.Weight != DefaultNamespaceWeight { + t.Errorf("weight of namespace should be %d, but not %d", DefaultNamespaceWeight, info.Weight) + } + + c.Delete(newQuota("abc", 0)) + + info = c.Snapshot() + if info.Weight != DefaultNamespaceWeight { + t.Errorf("weight of namespace should be %d, but not %d", DefaultNamespaceWeight, info.Weight) + } + + c.Delete(newQuota("abc", 0)) + c.Delete(newQuota("def", 15)) + c.Delete(newQuota("ghi", -1)) + + info = c.Snapshot() + if info.Weight != DefaultNamespaceWeight { + t.Errorf("weight of namespace should be default weight %d, but not %d", DefaultNamespaceWeight, info.Weight) + } +} diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 0d0751f0a9..5c668231a3 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -95,6 +95,7 @@ type SchedulerCache struct { pvcInformer infov1.PersistentVolumeClaimInformer scInformer storagev1.StorageClassInformer pcInformer schedv1.PriorityClassInformer + quotaInformer infov1.ResourceQuotaInformer Binder Binder Evictor Evictor @@ -110,6 +111,8 @@ type SchedulerCache struct { defaultPriorityClass *v1beta1.PriorityClass defaultPriority int32 + NamespaceCollection map[string]*kbapi.NamespaceCollection + errTasks workqueue.RateLimitingInterface deletedJobs workqueue.RateLimitingInterface } @@ -296,6 +299,8 @@ func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue s kbclient: kbClient, defaultQueue: defaultQueue, schedulerName: schedulerName, + + NamespaceCollection: make(map[string]*kbapi.NamespaceCollection), } // Prepare event clients. @@ -381,6 +386,13 @@ func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue s DeleteFunc: sc.DeletePriorityClass, }) + sc.quotaInformer = informerFactory.Core().V1().ResourceQuotas() + sc.quotaInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: sc.AddResourceQuota, + UpdateFunc: sc.UpdateResourceQuota, + DeleteFunc: sc.DeleteResourceQuota, + }) + kbinformer := kbinfo.NewSharedInformerFactory(sc.kbclient, 0) // create informer for PodGroup(v1alpha1) information sc.podGroupInformerV1alpha1 = kbinformer.Scheduling().V1alpha1().PodGroups() @@ -429,6 +441,7 @@ func (sc *SchedulerCache) Run(stopCh <-chan struct{}) { go sc.scInformer.Informer().Run(stopCh) go sc.queueInformerV1alpha1.Informer().Run(stopCh) go sc.queueInformerV1alpha2.Informer().Run(stopCh) + go sc.quotaInformer.Informer().Run(stopCh) if options.ServerOpts.EnablePriorityClass { go sc.pcInformer.Informer().Run(stopCh) @@ -457,6 +470,7 @@ func (sc *SchedulerCache) WaitForCacheSync(stopCh <-chan struct{}) bool { sc.scInformer.Informer().HasSynced, sc.queueInformerV1alpha1.Informer().HasSynced, sc.queueInformerV1alpha2.Informer().HasSynced, + sc.quotaInformer.Informer().HasSynced, } if options.ServerOpts.EnablePriorityClass { informerSynced = append(informerSynced, sc.pcInformer.Informer().HasSynced) @@ -685,9 +699,10 @@ func (sc *SchedulerCache) Snapshot() *kbapi.ClusterInfo { defer sc.Mutex.Unlock() snapshot := &kbapi.ClusterInfo{ - Nodes: make(map[string]*kbapi.NodeInfo), - Jobs: make(map[kbapi.JobID]*kbapi.JobInfo), - Queues: make(map[kbapi.QueueID]*kbapi.QueueInfo), + Nodes: make(map[string]*kbapi.NodeInfo), + Jobs: make(map[kbapi.JobID]*kbapi.JobInfo), + Queues: make(map[kbapi.QueueID]*kbapi.QueueInfo), + NamespaceInfo: make(map[kbapi.NamespaceName]*kbapi.NamespaceInfo), } for _, value := range sc.Nodes { @@ -726,6 +741,13 @@ func (sc *SchedulerCache) Snapshot() *kbapi.ClusterInfo { wg.Done() } + for _, value := range sc.NamespaceCollection { + info := value.Snapshot() + snapshot.NamespaceInfo[info.Name] = info + glog.V(4).Infof("Namespace %s has weight %v", + value.Name, info.GetWeight()) + } + for _, value := range sc.Jobs { // If no scheduling spec, does not handle it. if value.PodGroup == nil && value.PDB == nil { @@ -780,6 +802,15 @@ func (sc *SchedulerCache) String() string { } } + if len(sc.NamespaceCollection) != 0 { + str = str + "Namespaces:\n" + for _, ns := range sc.NamespaceCollection { + info := ns.Snapshot() + str = str + fmt.Sprintf("\t Namespace(%s) Weight(%v)\n", + info.Name, info.Weight) + } + } + return str } diff --git a/pkg/scheduler/cache/event_handlers.go b/pkg/scheduler/cache/event_handlers.go index 6ee0c62b8b..a823996a16 100644 --- a/pkg/scheduler/cache/event_handlers.go +++ b/pkg/scheduler/cache/event_handlers.go @@ -954,3 +954,80 @@ func (sc *SchedulerCache) addPriorityClass(pc *v1beta1.PriorityClass) { sc.PriorityClasses[pc.Name] = pc } + +func (sc *SchedulerCache) updateResourceQuota(quota *v1.ResourceQuota) { + collection, ok := sc.NamespaceCollection[quota.Namespace] + if !ok { + collection = kbapi.NewNamespaceCollection(quota.Namespace) + sc.NamespaceCollection[quota.Namespace] = collection + } + + collection.Update(quota) +} + +func (sc *SchedulerCache) deleteResourceQuota(quota *v1.ResourceQuota) { + collection, ok := sc.NamespaceCollection[quota.Namespace] + if !ok { + return + } + + collection.Delete(quota) +} + +// DeleteResourceQuota delete ResourceQuota from the scheduler cache +func (sc *SchedulerCache) DeleteResourceQuota(obj interface{}) { + var r *v1.ResourceQuota + switch t := obj.(type) { + case *v1.ResourceQuota: + r = t + case cache.DeletedFinalStateUnknown: + var ok bool + r, ok = t.Obj.(*v1.ResourceQuota) + if !ok { + glog.Errorf("Cannot convert to *v1.ResourceQuota: %v", t.Obj) + return + } + default: + glog.Errorf("Cannot convert to *v1.ResourceQuota: %v", t) + return + } + + sc.Mutex.Lock() + defer sc.Mutex.Unlock() + + glog.V(3).Infof("Delete ResourceQuota <%s/%v> in cache", r.Namespace, r.Name) + sc.deleteResourceQuota(r) +} + +// UpdateResourceQuota update ResourceQuota to scheduler cache +func (sc *SchedulerCache) UpdateResourceQuota(oldObj, newObj interface{}) { + newR, ok := newObj.(*v1.ResourceQuota) + if !ok { + glog.Errorf("Cannot convert newObj to *v1.ResourceQuota: %v", newObj) + return + } + + sc.Mutex.Lock() + defer sc.Mutex.Unlock() + + glog.V(3).Infof("Update ResourceQuota <%s/%v> in cache, with spec: %v.", newR.Namespace, newR.Name, newR.Spec.Hard) + sc.updateResourceQuota(newR) +} + +// AddResourceQuota add ResourceQuota to scheduler cache +func (sc *SchedulerCache) AddResourceQuota(obj interface{}) { + var r *v1.ResourceQuota + switch t := obj.(type) { + case *v1.ResourceQuota: + r = t + default: + glog.Errorf("Cannot convert to *v1.ResourceQuota: %v", t) + return + } + + sc.Mutex.Lock() + defer sc.Mutex.Unlock() + + glog.V(3).Infof("Add ResourceQuota <%s/%v> in cache, with spec: %v.", r.Namespace, r.Name, r.Spec.Hard) + sc.updateResourceQuota(r) +} diff --git a/pkg/scheduler/conf/scheduler_conf.go b/pkg/scheduler/conf/scheduler_conf.go index 516e07a9a5..a45c6e4697 100644 --- a/pkg/scheduler/conf/scheduler_conf.go +++ b/pkg/scheduler/conf/scheduler_conf.go @@ -35,6 +35,8 @@ type PluginOption struct { Name string `yaml:"name"` // EnabledJobOrder defines whether jobOrderFn is enabled EnabledJobOrder *bool `yaml:"enableJobOrder"` + // EnabledNamespaceOrder defines whether namespaceOrderFn is enabled + EnabledNamespaceOrder *bool `yaml:"enableNamespaceOrder"` // EnabledJobReady defines whether jobReadyFn is enabled EnabledJobReady *bool `yaml:"enableJobReady"` // EnabledJobPipelined defines whether jobPipelinedFn is enabled diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index a472920827..5c5548a046 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -41,9 +41,11 @@ type Session struct { podGroupStatus map[api.JobID]*scheduling.PodGroupStatus - Jobs map[api.JobID]*api.JobInfo - Nodes map[string]*api.NodeInfo - Queues map[api.QueueID]*api.QueueInfo + Jobs map[api.JobID]*api.JobInfo + Nodes map[string]*api.NodeInfo + Queues map[api.QueueID]*api.QueueInfo + NamespaceInfo map[api.NamespaceName]*api.NamespaceInfo + Backlog []*api.JobInfo Tiers []conf.Tier @@ -52,6 +54,7 @@ type Session struct { jobOrderFns map[string]api.CompareFn queueOrderFns map[string]api.CompareFn taskOrderFns map[string]api.CompareFn + namespaceOrderFns map[string]api.CompareFn predicateFns map[string]api.PredicateFn nodeOrderFns map[string]api.NodeOrderFn batchNodeOrderFns map[string]api.BatchNodeOrderFn @@ -81,6 +84,7 @@ func openSession(cache cache.Cache) *Session { jobOrderFns: map[string]api.CompareFn{}, queueOrderFns: map[string]api.CompareFn{}, taskOrderFns: map[string]api.CompareFn{}, + namespaceOrderFns: map[string]api.CompareFn{}, predicateFns: map[string]api.PredicateFn{}, nodeOrderFns: map[string]api.NodeOrderFn{}, batchNodeOrderFns: map[string]api.BatchNodeOrderFn{}, @@ -126,6 +130,7 @@ func openSession(cache cache.Cache) *Session { ssn.Nodes = snapshot.Nodes ssn.Queues = snapshot.Queues + ssn.NamespaceInfo = snapshot.NamespaceInfo glog.V(3).Infof("Open Session %v with <%d> Job and <%d> Queues", ssn.UID, len(ssn.Jobs), len(ssn.Queues)) @@ -143,6 +148,7 @@ func closeSession(ssn *Session) { ssn.plugins = nil ssn.eventHandlers = nil ssn.jobOrderFns = nil + ssn.namespaceOrderFns = nil ssn.queueOrderFns = nil glog.V(3).Infof("Close Session %v", ssn.UID) diff --git a/pkg/scheduler/framework/session_plugins.go b/pkg/scheduler/framework/session_plugins.go index d42e499050..e1af87c35a 100644 --- a/pkg/scheduler/framework/session_plugins.go +++ b/pkg/scheduler/framework/session_plugins.go @@ -36,6 +36,11 @@ func (ssn *Session) AddTaskOrderFn(name string, cf api.CompareFn) { ssn.taskOrderFns[name] = cf } +// AddNamespaceOrderFn add namespace order function +func (ssn *Session) AddNamespaceOrderFn(name string, cf api.CompareFn) { + ssn.namespaceOrderFns[name] = cf +} + // AddPreemptableFn add preemptable function func (ssn *Session) AddPreemptableFn(name string, cf api.EvictableFn) { ssn.preemptableFns[name] = cf @@ -304,6 +309,31 @@ func (ssn *Session) JobOrderFn(l, r interface{}) bool { } +// NamespaceOrderFn invoke namespaceorder function of the plugins +func (ssn *Session) NamespaceOrderFn(l, r interface{}) bool { + for _, tier := range ssn.Tiers { + for _, plugin := range tier.Plugins { + if !isEnabled(plugin.EnabledNamespaceOrder) { + continue + } + nof, found := ssn.namespaceOrderFns[plugin.Name] + if !found { + continue + } + if j := nof(l, r); j != 0 { + return j < 0 + } + } + } + + // TODO(lminzhw): if all NamespaceOrderFn treat these two namespace as the same, + // we should make the job order have its affect among namespaces. + // or just schedule namespace one by one + lv := l.(api.NamespaceName) + rv := r.(api.NamespaceName) + return lv < rv +} + // QueueOrderFn invoke queueorder function of the plugins func (ssn *Session) QueueOrderFn(l, r interface{}) bool { for _, tier := range ssn.Tiers { diff --git a/pkg/scheduler/plugins/defaults.go b/pkg/scheduler/plugins/defaults.go index 303e2ee858..4aedbdf85d 100644 --- a/pkg/scheduler/plugins/defaults.go +++ b/pkg/scheduler/plugins/defaults.go @@ -25,6 +25,9 @@ func ApplyPluginConfDefaults(option *conf.PluginOption) { if option.EnabledJobOrder == nil { option.EnabledJobOrder = &t } + if option.EnabledNamespaceOrder == nil { + option.EnabledNamespaceOrder = &t + } if option.EnabledJobReady == nil { option.EnabledJobReady = &t } diff --git a/pkg/scheduler/plugins/drf/drf.go b/pkg/scheduler/plugins/drf/drf.go index 94b7ab5e7d..e7adc0d4d4 100644 --- a/pkg/scheduler/plugins/drf/drf.go +++ b/pkg/scheduler/plugins/drf/drf.go @@ -43,6 +43,9 @@ type drfPlugin struct { // Key is Job ID jobAttrs map[api.JobID]*drfAttr + // map[namespaceName]->attr + namespaceOpts map[string]*drfAttr + // Arguments given for the plugin pluginArguments framework.Arguments } @@ -52,6 +55,7 @@ func New(arguments framework.Arguments) framework.Plugin { return &drfPlugin{ totalResource: api.EmptyResource(), jobAttrs: map[api.JobID]*drfAttr{}, + namespaceOpts: map[string]*drfAttr{}, pluginArguments: arguments, } } @@ -60,12 +64,27 @@ func (drf *drfPlugin) Name() string { return PluginName } +// NamespaceOrderEnabled returns the NamespaceOrder for this plugin is enabled in this session or not +func (drf *drfPlugin) NamespaceOrderEnabled(ssn *framework.Session) bool { + for _, tier := range ssn.Tiers { + for _, plugin := range tier.Plugins { + if plugin.Name != PluginName { + continue + } + return plugin.EnabledNamespaceOrder != nil && *plugin.EnabledNamespaceOrder + } + } + return false +} + func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) { // Prepare scheduling data for this session. for _, n := range ssn.Nodes { drf.totalResource.Add(n.Allocatable) } + namespaceOrderEnabled := drf.NamespaceOrderEnabled(ssn) + for _, job := range ssn.Jobs { attr := &drfAttr{ allocated: api.EmptyResource(), @@ -83,11 +102,78 @@ func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) { drf.updateShare(attr) drf.jobAttrs[job.UID] = attr + + if namespaceOrderEnabled { + nsOpts, found := drf.namespaceOpts[job.Namespace] + if !found { + nsOpts = &drfAttr{ + allocated: api.EmptyResource(), + } + drf.namespaceOpts[job.Namespace] = nsOpts + } + // all task in job should have the same namespace with job + nsOpts.allocated.Add(attr.allocated) + drf.updateShare(nsOpts) + } } preemptableFn := func(preemptor *api.TaskInfo, preemptees []*api.TaskInfo) []*api.TaskInfo { var victims []*api.TaskInfo + addVictim := func(candidate *api.TaskInfo) { + victims = append(victims, candidate) + } + + if namespaceOrderEnabled { + // apply the namespace share policy on preemptee firstly + + lWeight := ssn.NamespaceInfo[api.NamespaceName(preemptor.Namespace)].GetWeight() + lNsAtt := drf.namespaceOpts[preemptor.Namespace] + lNsAlloc := lNsAtt.allocated.Clone().Add(preemptor.Resreq) + _, lNsShare := drf.calculateShare(lNsAlloc, drf.totalResource) + lNsShareWeighted := lNsShare / float64(lWeight) + + namespaceAllocation := map[string]*api.Resource{} + + // undecidedPreemptees means this policy could not judge preemptee is preemptable or not + // and left it to next policy + undecidedPreemptees := []*api.TaskInfo{} + + for _, preemptee := range preemptees { + if preemptor.Namespace == preemptee.Namespace { + // policy is disabled when they are in the same namespace + undecidedPreemptees = append(undecidedPreemptees, preemptee) + continue + } + + // compute the preemptee namespace weighted share after preemption + nsAllocation, found := namespaceAllocation[preemptee.Namespace] + if !found { + rNsAtt := drf.namespaceOpts[preemptee.Namespace] + nsAllocation = rNsAtt.allocated.Clone() + namespaceAllocation[preemptee.Namespace] = nsAllocation + } + rWeight := ssn.NamespaceInfo[api.NamespaceName(preemptee.Namespace)].GetWeight() + rNsAlloc := nsAllocation.Sub(preemptee.Resreq) + _, rNsShare := drf.calculateShare(rNsAlloc, drf.totalResource) + rNsShareWeighted := rNsShare / float64(rWeight) + + // to avoid ping pong actions, the preemptee namespace should + // have the higher weighted share after preemption. + if lNsShareWeighted < rNsShareWeighted { + addVictim(preemptee) + } + if lNsShareWeighted-rNsShareWeighted > shareDelta { + continue + } + + // equal namespace order leads to judgement of jobOrder + undecidedPreemptees = append(undecidedPreemptees, preemptee) + } + + preemptees = undecidedPreemptees + } + latt := drf.jobAttrs[preemptor.Job] lalloc := latt.allocated.Clone().Add(preemptor.Resreq) _, ls := drf.calculateShare(lalloc, drf.totalResource) @@ -103,7 +189,7 @@ func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) { _, rs := drf.calculateShare(ralloc, drf.totalResource) if ls < rs || math.Abs(ls-rs) <= shareDelta { - victims = append(victims, preemptee) + addVictim(preemptee) } } @@ -134,6 +220,37 @@ func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) { ssn.AddJobOrderFn(drf.Name(), jobOrderFn) + namespaceOrderFn := func(l interface{}, r interface{}) int { + lv := l.(api.NamespaceName) + rv := r.(api.NamespaceName) + + lOpt := drf.namespaceOpts[string(lv)] + rOpt := drf.namespaceOpts[string(rv)] + + lWeight := ssn.NamespaceInfo[lv].GetWeight() + rWeight := ssn.NamespaceInfo[rv].GetWeight() + + glog.V(4).Infof("DRF NamespaceOrderFn: <%v> share state: %f, weight %v, <%v> share state: %f, weight %v", + lv, lOpt.share, lWeight, rv, rOpt.share, rWeight) + + lWeightedShare := lOpt.share / float64(lWeight) + rWeightedShare := rOpt.share / float64(rWeight) + + if lWeightedShare == rWeightedShare { + return 0 + } + + if lWeightedShare < rWeightedShare { + return -1 + } + + return 1 + } + + if namespaceOrderEnabled { + ssn.AddNamespaceOrderFn(drf.Name(), namespaceOrderFn) + } + // Register event handlers. ssn.AddEventHandler(&framework.EventHandler{ AllocateFunc: func(event *framework.Event) { @@ -142,8 +259,17 @@ func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) { drf.updateShare(attr) - glog.V(4).Infof("DRF AllocateFunc: task <%v/%v>, resreq <%v>, share <%v>", - event.Task.Namespace, event.Task.Name, event.Task.Resreq, attr.share) + nsShare := -1.0 + if namespaceOrderEnabled { + nsOpt := drf.namespaceOpts[event.Task.Namespace] + nsOpt.allocated.Add(event.Task.Resreq) + + drf.updateShare(nsOpt) + nsShare = nsOpt.share + } + + glog.V(4).Infof("DRF AllocateFunc: task <%v/%v>, resreq <%v>, share <%v>, namespace share <%v>", + event.Task.Namespace, event.Task.Name, event.Task.Resreq, attr.share, nsShare) }, DeallocateFunc: func(event *framework.Event) { attr := drf.jobAttrs[event.Task.Job] @@ -151,8 +277,17 @@ func (drf *drfPlugin) OnSessionOpen(ssn *framework.Session) { drf.updateShare(attr) - glog.V(4).Infof("DRF EvictFunc: task <%v/%v>, resreq <%v>, share <%v>", - event.Task.Namespace, event.Task.Name, event.Task.Resreq, attr.share) + nsShare := -1.0 + if namespaceOrderEnabled { + nsOpt := drf.namespaceOpts[event.Task.Namespace] + nsOpt.allocated.Sub(event.Task.Resreq) + + drf.updateShare(nsOpt) + nsShare = nsOpt.share + } + + glog.V(4).Infof("DRF EvictFunc: task <%v/%v>, resreq <%v>, share <%v>, namespace share <%v>", + event.Task.Namespace, event.Task.Name, event.Task.Resreq, attr.share, nsShare) }, }) } diff --git a/pkg/scheduler/util_test.go b/pkg/scheduler/util_test.go index f127d36828..cd3829c9b4 100644 --- a/pkg/scheduler/util_test.go +++ b/pkg/scheduler/util_test.go @@ -44,92 +44,99 @@ tiers: { Plugins: []conf.PluginOption{ { - Name: "priority", - EnabledJobOrder: &trueValue, - EnabledJobReady: &trueValue, - EnabledJobPipelined: &trueValue, - EnabledTaskOrder: &trueValue, - EnabledPreemptable: &trueValue, - EnabledReclaimable: &trueValue, - EnabledQueueOrder: &trueValue, - EnabledPredicate: &trueValue, - EnabledNodeOrder: &trueValue, + Name: "priority", + EnabledJobOrder: &trueValue, + EnabledNamespaceOrder: &trueValue, + EnabledJobReady: &trueValue, + EnabledJobPipelined: &trueValue, + EnabledTaskOrder: &trueValue, + EnabledPreemptable: &trueValue, + EnabledReclaimable: &trueValue, + EnabledQueueOrder: &trueValue, + EnabledPredicate: &trueValue, + EnabledNodeOrder: &trueValue, }, { - Name: "gang", - EnabledJobOrder: &trueValue, - EnabledJobReady: &trueValue, - EnabledJobPipelined: &trueValue, - EnabledTaskOrder: &trueValue, - EnabledPreemptable: &trueValue, - EnabledReclaimable: &trueValue, - EnabledQueueOrder: &trueValue, - EnabledPredicate: &trueValue, - EnabledNodeOrder: &trueValue, + Name: "gang", + EnabledJobOrder: &trueValue, + EnabledNamespaceOrder: &trueValue, + EnabledJobReady: &trueValue, + EnabledJobPipelined: &trueValue, + EnabledTaskOrder: &trueValue, + EnabledPreemptable: &trueValue, + EnabledReclaimable: &trueValue, + EnabledQueueOrder: &trueValue, + EnabledPredicate: &trueValue, + EnabledNodeOrder: &trueValue, }, { - Name: "conformance", - EnabledJobOrder: &trueValue, - EnabledJobReady: &trueValue, - EnabledJobPipelined: &trueValue, - EnabledTaskOrder: &trueValue, - EnabledPreemptable: &trueValue, - EnabledReclaimable: &trueValue, - EnabledQueueOrder: &trueValue, - EnabledPredicate: &trueValue, - EnabledNodeOrder: &trueValue, + Name: "conformance", + EnabledJobOrder: &trueValue, + EnabledNamespaceOrder: &trueValue, + EnabledJobReady: &trueValue, + EnabledJobPipelined: &trueValue, + EnabledTaskOrder: &trueValue, + EnabledPreemptable: &trueValue, + EnabledReclaimable: &trueValue, + EnabledQueueOrder: &trueValue, + EnabledPredicate: &trueValue, + EnabledNodeOrder: &trueValue, }, }, }, { Plugins: []conf.PluginOption{ { - Name: "drf", - EnabledJobOrder: &trueValue, - EnabledJobReady: &trueValue, - EnabledJobPipelined: &trueValue, - EnabledTaskOrder: &trueValue, - EnabledPreemptable: &trueValue, - EnabledReclaimable: &trueValue, - EnabledQueueOrder: &trueValue, - EnabledPredicate: &trueValue, - EnabledNodeOrder: &trueValue, + Name: "drf", + EnabledJobOrder: &trueValue, + EnabledNamespaceOrder: &trueValue, + EnabledJobReady: &trueValue, + EnabledJobPipelined: &trueValue, + EnabledTaskOrder: &trueValue, + EnabledPreemptable: &trueValue, + EnabledReclaimable: &trueValue, + EnabledQueueOrder: &trueValue, + EnabledPredicate: &trueValue, + EnabledNodeOrder: &trueValue, }, { - Name: "predicates", - EnabledJobOrder: &trueValue, - EnabledJobReady: &trueValue, - EnabledJobPipelined: &trueValue, - EnabledTaskOrder: &trueValue, - EnabledPreemptable: &trueValue, - EnabledReclaimable: &trueValue, - EnabledQueueOrder: &trueValue, - EnabledPredicate: &trueValue, - EnabledNodeOrder: &trueValue, + Name: "predicates", + EnabledJobOrder: &trueValue, + EnabledNamespaceOrder: &trueValue, + EnabledJobReady: &trueValue, + EnabledJobPipelined: &trueValue, + EnabledTaskOrder: &trueValue, + EnabledPreemptable: &trueValue, + EnabledReclaimable: &trueValue, + EnabledQueueOrder: &trueValue, + EnabledPredicate: &trueValue, + EnabledNodeOrder: &trueValue, }, { - Name: "proportion", - EnabledJobOrder: &trueValue, - EnabledJobReady: &trueValue, - EnabledJobPipelined: &trueValue, - EnabledTaskOrder: &trueValue, - EnabledPreemptable: &trueValue, - EnabledReclaimable: &trueValue, - EnabledQueueOrder: &trueValue, - EnabledPredicate: &trueValue, - EnabledNodeOrder: &trueValue, + Name: "proportion", + EnabledJobOrder: &trueValue, + EnabledNamespaceOrder: &trueValue, + EnabledJobReady: &trueValue, + EnabledJobPipelined: &trueValue, + EnabledTaskOrder: &trueValue, + EnabledPreemptable: &trueValue, + EnabledReclaimable: &trueValue, + EnabledQueueOrder: &trueValue, + EnabledPredicate: &trueValue, + EnabledNodeOrder: &trueValue, }, { - Name: "nodeorder", - EnabledJobOrder: &trueValue, - EnabledJobReady: &trueValue, - EnabledJobPipelined: &trueValue, - EnabledTaskOrder: &trueValue, - EnabledPreemptable: &trueValue, - EnabledReclaimable: &trueValue, - EnabledQueueOrder: &trueValue, - EnabledPredicate: &trueValue, - EnabledNodeOrder: &trueValue, + Name: "nodeorder", + EnabledJobOrder: &trueValue, + EnabledNamespaceOrder: &trueValue, + EnabledJobReady: &trueValue, + EnabledJobPipelined: &trueValue, + EnabledTaskOrder: &trueValue, + EnabledPreemptable: &trueValue, + EnabledReclaimable: &trueValue, + EnabledQueueOrder: &trueValue, + EnabledPredicate: &trueValue, + EnabledNodeOrder: &trueValue, }, }, }, diff --git a/test/e2e/job_scheduling.go b/test/e2e/job_scheduling.go index cd836e8701..95e792a460 100644 --- a/test/e2e/job_scheduling.go +++ b/test/e2e/job_scheduling.go @@ -18,6 +18,7 @@ package e2e import ( "fmt" + "strings" "time" . "github.com/onsi/ginkgo" @@ -27,7 +28,9 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" kbapi "volcano.sh/volcano/pkg/scheduler/api" ) @@ -474,4 +477,196 @@ var _ = Describe("Job E2E Test", func() { err = waitJobPhaseReady(context, job) Expect(err).NotTo(HaveOccurred()) }) + + It("Namespace Fair Share", func() { + context := initTestContext() + defer cleanupTestContext(context) + + const fairShareNamespace = "fairshare" + + _, err := context.kubeclient.CoreV1().Namespaces().Create(&v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: fairShareNamespace, + }, + }) + Expect(err).NotTo(HaveOccurred()) + defer func() { + deleteForeground := metav1.DeletePropagationForeground + err := context.kubeclient.CoreV1().Namespaces().Delete(fairShareNamespace, &metav1.DeleteOptions{ + PropagationPolicy: &deleteForeground, + }) + Expect(err).NotTo(HaveOccurred()) + + err = wait.Poll(100*time.Millisecond, twoMinute, namespaceNotExistWithName(context, fairShareNamespace)) + Expect(err).NotTo(HaveOccurred()) + }() + + slot := halfCPU + rep := clusterSize(context, slot) + + createJobToNamespace := func(namespace string, index int, replica int32) *vkv1.Job { + spec := &jobSpec{ + name: fmt.Sprintf("namespace-fair-share-%s-%d", namespace, index), + namespace: namespace, + tasks: []taskSpec{ + { + img: defaultNginxImage, + command: "sleep 10000", + req: slot, + min: 2, + rep: replica, + }, + }, + } + job := createJob(context, spec) + return job + } + + By("occupy all cluster resources") + occupiedJob := createJobToNamespace("default", 123, rep*2) + err = waitJobReady(context, occupiedJob) + Expect(err).NotTo(HaveOccurred()) + + for i := 0; i < int(rep); i++ { + createJobToNamespace("test", i, 2) + createJobToNamespace(fairShareNamespace, i, 2) + } + + By("release occupied cluster resources") + deleteBackground := metav1.DeletePropagationBackground + err = context.vcclient.BatchV1alpha1().Jobs(occupiedJob.Namespace).Delete(occupiedJob.Name, + &metav1.DeleteOptions{ + PropagationPolicy: &deleteBackground, + }) + Expect(err).NotTo(HaveOccurred()) + + By("wait occupied cluster resources releasing") + err = waitJobCleanedUp(context, occupiedJob) + Expect(err).NotTo(HaveOccurred()) + + By("wait pod in fs/test namespace scheduled") + fsScheduledPod := 0 + testScheduledPod := 0 + expectPod := int(rep) + err = wait.Poll(100*time.Millisecond, oneMinute, func() (bool, error) { + fsScheduledPod = 0 + testScheduledPod = 0 + + pods, err := context.kubeclient.CoreV1().Pods(fairShareNamespace).List(metav1.ListOptions{}) + if err != nil { + return false, err + } + for _, pod := range pods.Items { + if isPodScheduled(&pod) { + fsScheduledPod++ + } + } + + pods, err = context.kubeclient.CoreV1().Pods("test").List(metav1.ListOptions{}) + if err != nil { + return false, err + } + for _, pod := range pods.Items { + if isPodScheduled(&pod) { + testScheduledPod++ + } + } + + if testScheduledPod+fsScheduledPod == expectPod { + return true, nil + } + + return false, nil + }) + Expect(err).NotTo(HaveOccurred()) + + Expect(testScheduledPod).Should(Or(Equal(expectPod/2), Equal((expectPod+1)/2)), + fmt.Sprintf("expectPod %d, fsScheduledPod %d, testScheduledPod %d", expectPod, fsScheduledPod, testScheduledPod)) + }) + + It("Queue Fair Share", func() { + context := initTestContext() + defer cleanupTestContext(context) + + slot := halfCPU + rep := clusterSize(context, slot) + + createJobToQueue := func(queue string, index int, replica int32) *vkv1.Job { + spec := &jobSpec{ + name: fmt.Sprintf("queue-fair-share-%s-%d", queue, index), + namespace: "test", + queue: queue, + tasks: []taskSpec{ + { + img: defaultNginxImage, + command: "sleep 10000", + req: slot, + min: 2, + rep: replica, + }, + }, + } + job := createJob(context, spec) + return job + } + + By("occupy all cluster resources") + occupiedJob := createJobToQueue("default", 123, rep*2) + err := waitJobReady(context, occupiedJob) + Expect(err).NotTo(HaveOccurred()) + + for i := 0; i < int(rep); i++ { + createJobToQueue(defaultQueue1, i, 2) + createJobToQueue(defaultQueue2, i, 2) + } + + By(fmt.Sprintf("release occupied cluster resources, %s/%s", occupiedJob.Namespace, occupiedJob.Name)) + deleteForeground := metav1.DeletePropagationBackground + err = context.vcclient.BatchV1alpha1().Jobs(occupiedJob.Namespace).Delete(occupiedJob.Name, + &metav1.DeleteOptions{ + PropagationPolicy: &deleteForeground, + }) + Expect(err).NotTo(HaveOccurred()) + + By("wait occupied cluster resources releasing") + err = waitJobCleanedUp(context, occupiedJob) + Expect(err).NotTo(HaveOccurred()) + + By("wait pod in queue q1/q2 scheduled") + q1ScheduledPod := 0 + q2ScheduledPod := 0 + expectPod := int(rep) + err = wait.Poll(100*time.Millisecond, oneMinute, func() (bool, error) { + q1ScheduledPod = 0 + q2ScheduledPod = 0 + + pods, err := context.kubeclient.CoreV1().Pods("test").List(metav1.ListOptions{}) + if err != nil { + return false, err + } + for _, pod := range pods.Items { + if !isPodScheduled(&pod) { + continue + } + jobName := pod.Annotations[vkv1.JobNameKey] + if strings.Contains(jobName, "queue-fair-share-"+defaultQueue1) { + q1ScheduledPod++ + } + if strings.Contains(jobName, "queue-fair-share-"+defaultQueue2) { + q2ScheduledPod++ + } + } + + if q2ScheduledPod+q1ScheduledPod == expectPod { + return true, nil + } + + return false, nil + }) + Expect(err).NotTo(HaveOccurred()) + + Expect(q2ScheduledPod).Should(Or(Equal(expectPod/2), Equal((expectPod+1)/2)), + fmt.Sprintf("expectPod %d, q1ScheduledPod %d, q2ScheduledPod %d", expectPod, q1ScheduledPod, q2ScheduledPod)) + }) + }) diff --git a/test/e2e/util.go b/test/e2e/util.go index c2c5e6f4e6..f89d739346 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -52,6 +52,7 @@ var ( twoMinute = 2 * time.Minute oneCPU = v1.ResourceList{"cpu": resource.MustParse("1000m")} thirtyCPU = v1.ResourceList{"cpu": resource.MustParse("30000m")} + halfCPU = v1.ResourceList{"cpu": resource.MustParse("500m")} ) const ( @@ -167,8 +168,12 @@ func initTestContext() *context { } func namespaceNotExist(ctx *context) wait.ConditionFunc { + return namespaceNotExistWithName(ctx, ctx.namespace) +} + +func namespaceNotExistWithName(ctx *context, name string) wait.ConditionFunc { return func() (bool, error) { - _, err := ctx.kubeclient.CoreV1().Namespaces().Get(ctx.namespace, metav1.GetOptions{}) + _, err := ctx.kubeclient.CoreV1().Namespaces().Get(name, metav1.GetOptions{}) if !(err != nil && errors.IsNotFound(err)) { return false, err } @@ -752,6 +757,9 @@ func createReplicaSet(context *context, name string, rep int32, img string, req func waitJobCleanedUp(ctx *context, cleanupjob *batchv1alpha1.Job) error { var additionalError error + + pods := getTasksOfJob(ctx, cleanupjob) + err := wait.Poll(100*time.Millisecond, oneMinute, func() (bool, error) { job, err := ctx.vcclient.BatchV1alpha1().Jobs(cleanupjob.Namespace).Get(cleanupjob.Name, metav1.GetOptions{}) if err != nil && !errors.IsNotFound(err) { @@ -776,6 +784,14 @@ func waitJobCleanedUp(ctx *context, cleanupjob *batchv1alpha1.Job) error { if err != nil && strings.Contains(err.Error(), timeOutMessage) { return fmt.Errorf("[Wait time out]: %s", additionalError) } + + for _, pod := range pods { + err := waitPodGone(ctx, pod.Name, pod.Namespace) + if err != nil { + return err + } + } + return err } @@ -1159,3 +1175,12 @@ func pgIsReady(ctx *context, namespace string) (bool, error) { return false, fmt.Errorf("podgroup phase is Pending") } + +func isPodScheduled(pod *v1.Pod) bool { + for _, cond := range pod.Status.Conditions { + if cond.Type == v1.PodScheduled && cond.Status == v1.ConditionTrue { + return true + } + } + return false +}