Skip to content

Commit

Permalink
Merge pull request #358 from lminzhw/drf_fair_share
Browse files Browse the repository at this point in the history
support fair share
  • Loading branch information
volcano-sh-bot committed Aug 26, 2019
2 parents f667715 + 2f918a5 commit c7126c6
Show file tree
Hide file tree
Showing 18 changed files with 935 additions and 107 deletions.
3 changes: 3 additions & 0 deletions installer/helm/chart/volcano/templates/scheduler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ rules:
- apiGroups: [""]
resources: ["namespaces"]
verbs: ["list", "watch"]
- apiGroups: [""]
resources: ["resourcequotas"]
verbs: ["list", "watch"]
- apiGroups: ["storage.k8s.io"]
resources: ["storageclasses"]
verbs: ["list", "watch"]
Expand Down
3 changes: 3 additions & 0 deletions installer/volcano-development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ rules:
- apiGroups: [""]
resources: ["namespaces"]
verbs: ["list", "watch"]
- apiGroups: [""]
resources: ["resourcequotas"]
verbs: ["list", "watch"]
- apiGroups: ["storage.k8s.io"]
resources: ["storageclasses"]
verbs: ["list", "watch"]
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/scheduling/v1alpha1/zz_generated.conversion.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

85 changes: 66 additions & 19 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,20 @@ func (alloc *allocateAction) Initialize() {}
func (alloc *allocateAction) Execute(ssn *framework.Session) {
glog.V(3).Infof("Enter Allocate ...")
defer glog.V(3).Infof("Leaving Allocate ...")
// further
queues := util.NewPriorityQueue(ssn.QueueOrderFn)
jobsMap := map[api.QueueID]*util.PriorityQueue{}

// the allocation for pod may have many stages
// 1. pick a namespace named N (using ssn.NamespaceOrderFn)
// 2. pick a queue named Q from N (using ssn.QueueOrderFn)
// 3. pick a job named J from Q (using ssn.JobOrderFn)
// 4. pick a task T from J (using ssn.TaskOrderFn)
// 5. use predicateFn to filter out node that T can not be allocated on.
// 6. use ssn.NodeOrderFn to judge the best node and assign it to T

namespaces := util.NewPriorityQueue(ssn.NamespaceOrderFn)

// jobsMap is map[api.NamespaceName]map[api.QueueID]PriorityQueue(*api.JobInfo)
// used to find job with highest priority in given queue and namespace
jobsMap := map[api.NamespaceName]map[api.QueueID]*util.PriorityQueue{}

for _, job := range ssn.Jobs {
if job.PodGroup.Status.Phase == scheduling.PodGroupPending {
Expand All @@ -55,23 +66,32 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
continue
}

if queue, found := ssn.Queues[job.Queue]; found {
queues.Push(queue)
} else {
if _, found := ssn.Queues[job.Queue]; !found {
glog.Warningf("Skip adding Job <%s/%s> because its queue %s is not found",
job.Namespace, job.Name, job.Queue)
continue
}

if _, found := jobsMap[job.Queue]; !found {
jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
namespace := api.NamespaceName(job.Namespace)
queueMap, found := jobsMap[namespace]
if !found {
namespaces.Push(namespace)

queueMap = make(map[api.QueueID]*util.PriorityQueue)
jobsMap[namespace] = queueMap
}

jobs, found := queueMap[job.Queue]
if !found {
jobs = util.NewPriorityQueue(ssn.JobOrderFn)
queueMap[job.Queue] = jobs
}

glog.V(4).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue)
jobsMap[job.Queue].Push(job)
jobs.Push(job)
}

glog.V(3).Infof("Try to allocate resource to %d Queues", len(jobsMap))
glog.V(3).Infof("Try to allocate resource to %d Namespaces", len(jobsMap))

pendingTasks := map[api.JobID]*util.PriorityQueue{}

Expand All @@ -92,21 +112,47 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
return ssn.PredicateFn(task, node)
}

// To pick <namespace, queue> tuple for job, we choose to pick namespace firstly.
// Because we believe that number of queues would less than namespaces in most case.
// And, this action would make the resource usage among namespace balanced.
for {
if queues.Empty() {
if namespaces.Empty() {
break
}

queue := queues.Pop().(*api.QueueInfo)
if ssn.Overused(queue) {
glog.V(3).Infof("Queue <%s> is overused, ignore it.", queue.Name)
continue
// pick namespace from namespaces PriorityQueue
namespace := namespaces.Pop().(api.NamespaceName)

queueInNamespace := jobsMap[namespace]

// pick queue for given namespace
//
// This block use a algorithm with time complex O(n).
// But at least PriorityQueue could not be used here,
// because the allocation of job would change the priority of queue among all namespaces,
// and the PriorityQueue have no ability to update priority for a special queue.
var queue *api.QueueInfo
for queueId := range queueInNamespace {
currentQueue := ssn.Queues[queueId]
if ssn.Overused(currentQueue) {
glog.V(3).Infof("Namespace <%s> Queue <%s> is overused, ignore it.", namespace, currentQueue.Name)
delete(queueInNamespace, queueId)
continue
}

if queue == nil || ssn.QueueOrderFn(currentQueue, queue) {
queue = currentQueue
}
}

jobs, found := jobsMap[queue.UID]
if queue == nil {
glog.V(3).Infof("Namespace <%s> have no queue, skip it", namespace)
continue
}

glog.V(3).Infof("Try to allocate resource to Jobs in Queue <%v>", queue.Name)
glog.V(3).Infof("Try to allocate resource to Jobs in Namespace <%s> Queue <%v>", namespace, queue.Name)

jobs, found := queueInNamespace[queue.UID]
if !found || jobs.Empty() {
glog.V(4).Infof("Can not find jobs for queue %s.", queue.Name)
continue
Expand Down Expand Up @@ -194,8 +240,9 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
} else {
stmt.Discard()
}
// Added Queue back until no job in Queue.
queues.Push(queue)

// Added Namespace back until no job in Namespace.
namespaces.Push(namespace)
}
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/scheduler/actions/allocate/allocate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,10 @@ func TestAllocate(t *testing.T) {
{
Plugins: []conf.PluginOption{
{
Name: "drf",
EnabledPreemptable: &trueValue,
EnabledJobOrder: &trueValue,
Name: "drf",
EnabledPreemptable: &trueValue,
EnabledJobOrder: &trueValue,
EnabledNamespaceOrder: &trueValue,
},
{
Name: "proportion",
Expand Down
15 changes: 12 additions & 3 deletions pkg/scheduler/api/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ import "fmt"

// ClusterInfo is a snapshot of cluster by cache.
type ClusterInfo struct {
Jobs map[JobID]*JobInfo
Nodes map[string]*NodeInfo
Queues map[QueueID]*QueueInfo
Jobs map[JobID]*JobInfo
Nodes map[string]*NodeInfo
Queues map[QueueID]*QueueInfo
NamespaceInfo map[NamespaceName]*NamespaceInfo
}

func (ci ClusterInfo) String() string {
Expand Down Expand Up @@ -57,5 +58,13 @@ func (ci ClusterInfo) String() string {
}
}

if len(ci.NamespaceInfo) != 0 {
str = str + "Namespaces:\n"
for _, ns := range ci.NamespaceInfo {
str = str + fmt.Sprintf("\t Namespace(%s) Weight(%v)\n",
ns.Name, ns.Weight)
}
}

return str
}
146 changes: 146 additions & 0 deletions pkg/scheduler/api/namespace_info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
Copyright 2018 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package api

import (
"fmt"

"github.com/golang/glog"

v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)

// NamespaceName is name of namespace
type NamespaceName string

const (
// NamespaceWeightKey is the key in ResourceQuota.spec.hard indicating the weight of this namespace
NamespaceWeightKey = "volcano.sh/namespace.weight"
// DefaultNamespaceWeight is the default weight of namespace
DefaultNamespaceWeight = 1
)

// NamespaceInfo records information of namespace
type NamespaceInfo struct {
// Name is the name of this namespace
Name NamespaceName
// Weight is the highest weight among many ResourceQuota.
Weight int64
}

// GetWeight returns weight of a namespace, any invalid case would get default value
func (n *NamespaceInfo) GetWeight() int64 {
if n == nil || n.Weight == 0 {
return DefaultNamespaceWeight
}
return n.Weight
}

type quotaItem struct {
name string
weight int64
}

func quotaItemKeyFunc(obj interface{}) (string, error) {
item, ok := obj.(*quotaItem)
if !ok {
return "", fmt.Errorf("obj with type %T could not parse", obj)
}
return item.name, nil
}

// for big root heap
func quotaItemLessFunc(a interface{}, b interface{}) bool {
A := a.(*quotaItem)
B := b.(*quotaItem)
return A.weight > B.weight
}

// NamespaceCollection will record all details about namespace
type NamespaceCollection struct {
Name string

quotaWeight *cache.Heap
}

// NewNamespaceCollection creates new NamespaceCollection object to record all information about a namespace
func NewNamespaceCollection(name string) *NamespaceCollection {
n := &NamespaceCollection{
Name: name,
quotaWeight: cache.NewHeap(quotaItemKeyFunc, quotaItemLessFunc),
}
// add at least one item into quotaWeight.
// Because cache.Heap.Pop would be blocked until queue is not empty
n.updateWeight(&quotaItem{
name: NamespaceWeightKey,
weight: DefaultNamespaceWeight,
})
return n
}

func (n *NamespaceCollection) deleteWeight(q *quotaItem) {
n.quotaWeight.Delete(q)
}

func (n *NamespaceCollection) updateWeight(q *quotaItem) {
n.quotaWeight.Update(q)
}

func itemFromQuota(quota *v1.ResourceQuota) *quotaItem {
var weight int64 = DefaultNamespaceWeight

quotaWeight, ok := quota.Spec.Hard[NamespaceWeightKey]
if ok {
weight = quotaWeight.Value()
}

item := &quotaItem{
name: quota.Name,
weight: weight,
}
return item
}

// Update modify the registered information according quota object
func (n *NamespaceCollection) Update(quota *v1.ResourceQuota) {
n.updateWeight(itemFromQuota(quota))
}

// Delete remove the registered information according quota object
func (n *NamespaceCollection) Delete(quota *v1.ResourceQuota) {
n.deleteWeight(itemFromQuota(quota))
}

// Snapshot will clone a NamespaceInfo without Heap according NamespaceCollection
func (n *NamespaceCollection) Snapshot() *NamespaceInfo {
var weight int64 = DefaultNamespaceWeight

obj, err := n.quotaWeight.Pop()
if err != nil {
glog.Warningf("namespace %s, quota weight meets error %v when pop", n.Name, err)
} else {
item := obj.(*quotaItem)
weight = item.weight
n.quotaWeight.Add(item)
}

return &NamespaceInfo{
Name: NamespaceName(n.Name),
Weight: weight,
}
}
Loading

0 comments on commit c7126c6

Please sign in to comment.