Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor namespace fairshare function. #2757

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 22 additions & 62 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,16 @@ func (alloc *Action) Execute(ssn *framework.Session) {
defer klog.V(3).Infof("Leaving Allocate ...")

// the allocation for pod may have many stages
// 1. pick a namespace named N (using ssn.NamespaceOrderFn)
// 2. pick a queue named Q from N (using ssn.QueueOrderFn)
// 3. pick a job named J from Q (using ssn.JobOrderFn)
// 4. pick a task T from J (using ssn.TaskOrderFn)
// 5. use predicateFn to filter out node that T can not be allocated on.
// 6. use ssn.NodeOrderFn to judge the best node and assign it to T
// 1. pick a queue named Q (using ssn.QueueOrderFn)
// 2. pick a job named J from Q (using ssn.JobOrderFn)
// 3. pick a task T from J (using ssn.TaskOrderFn)
// 4. use predicateFn to filter out node that T can not be allocated on.
// 5. use ssn.NodeOrderFn to judge the best node and assign it to T

namespaces := util.NewPriorityQueue(ssn.NamespaceOrderFn)

// jobsMap is map[api.NamespaceName]map[api.QueueID]PriorityQueue(*api.JobInfo)
// used to find job with highest priority in given queue and namespace
jobsMap := map[api.NamespaceName]map[api.QueueID]*util.PriorityQueue{}
// queues sort queues by QueueOrderFn.
queues := util.NewPriorityQueue(ssn.QueueOrderFn)
// jobsMap is used to find job with the highest priority in given queue.
jobsMap := map[api.QueueID]*util.PriorityQueue{}

for _, job := range ssn.Jobs {
// If not config enqueue action, change Pending pg into Inqueue statue to avoid blocking job scheduling.
Expand All @@ -84,26 +82,16 @@ func (alloc *Action) Execute(ssn *framework.Session) {
continue
}

namespace := api.NamespaceName(job.Namespace)
queueMap, found := jobsMap[namespace]
if !found {
namespaces.Push(namespace)

queueMap = make(map[api.QueueID]*util.PriorityQueue)
jobsMap[namespace] = queueMap
}

jobs, found := queueMap[job.Queue]
if !found {
jobs = util.NewPriorityQueue(ssn.JobOrderFn)
queueMap[job.Queue] = jobs
if _, found := jobsMap[job.Queue]; !found {
jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
queues.Push(ssn.Queues[job.Queue])
}

klog.V(4).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue)
jobs.Push(job)
jobsMap[job.Queue].Push(job)
}

klog.V(3).Infof("Try to allocate resource to %d Namespaces", len(jobsMap))
klog.V(3).Infof("Try to allocate resource to %d Queues", len(jobsMap))

pendingTasks := map[api.JobID]*util.PriorityQueue{}

Expand All @@ -121,49 +109,21 @@ func (alloc *Action) Execute(ssn *framework.Session) {
// Because we believe that number of queues would less than namespaces in most case.
// And, this action would make the resource usage among namespace balanced.
for {
if namespaces.Empty() {
if queues.Empty() {
break
}

// pick namespace from namespaces PriorityQueue
namespace := namespaces.Pop().(api.NamespaceName)

queueInNamespace := jobsMap[namespace]

// pick queue for given namespace
//
// This block use an algorithm with time complex O(n).
// But at least PriorityQueue could not be used here,
// because the allocation of job would change the priority of queue among all namespaces,
// and the PriorityQueue have no ability to update priority for a special queue.
var queue *api.QueueInfo
for queueID := range queueInNamespace {
currentQueue := ssn.Queues[queueID]
if ssn.Overused(currentQueue) {
klog.V(3).Infof("Namespace <%s> Queue <%s> is overused, ignore it.", namespace, currentQueue.Name)
delete(queueInNamespace, queueID)
continue
}
if jobs, found := queueInNamespace[currentQueue.UID]; found && jobs.Empty() {
continue
}

if queue == nil || ssn.QueueOrderFn(currentQueue, queue) {
queue = currentQueue
}
}
queue := queues.Pop().(*api.QueueInfo)

if queue == nil {
klog.V(3).Infof("Namespace <%s> have no queue, skip it", namespace)
if ssn.Overused(queue) {
klog.V(3).Infof("Queue <%s> is overused, ignore it.", queue.Name)
continue
}

klog.V(3).Infof("Try to allocate resource to Jobs in Namespace <%s> Queue <%v>", namespace, queue.Name)
klog.V(3).Infof("Try to allocate resource to Jobs in Queue <%s>", queue.Name)

jobs, found := queueInNamespace[queue.UID]
jobs, found := jobsMap[queue.UID]
if !found || jobs.Empty() {
delete(queueInNamespace, queue.UID)
namespaces.Push(namespace)
klog.V(4).Infof("Can not find jobs for queue %s.", queue.Name)
continue
}
Expand All @@ -185,8 +145,8 @@ func (alloc *Action) Execute(ssn *framework.Session) {
}
tasks := pendingTasks[job.UID]

// Added Namespace back until no job in Namespace.
namespaces.Push(namespace)
// Added Queue back until no job in Namespace.
queues.Push(queue)

if tasks.Empty() {
continue
Expand Down
7 changes: 3 additions & 4 deletions pkg/scheduler/actions/allocate/allocate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,9 @@ func TestAllocate(t *testing.T) {
{
Plugins: []conf.PluginOption{
{
Name: "drf",
EnabledPreemptable: &trueValue,
EnabledJobOrder: &trueValue,
EnabledNamespaceOrder: &trueValue,
Name: "drf",
EnabledPreemptable: &trueValue,
EnabledJobOrder: &trueValue,
},
{
Name: "proportion",
Expand Down
3 changes: 1 addition & 2 deletions pkg/scheduler/api/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ func (ci ClusterInfo) String() string {
if len(ci.NamespaceInfo) != 0 {
str += "Namespaces:\n"
for _, ns := range ci.NamespaceInfo {
str += fmt.Sprintf("\t Namespace(%s) Weight(%v)\n",
ns.Name, ns.Weight)
str += fmt.Sprintf("\t Namespace(%s)\n", ns.Name)
}
}

Expand Down
91 changes: 1 addition & 90 deletions pkg/scheduler/api/namespace_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,138 +17,49 @@ limitations under the License.
package api

import (
"fmt"

v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
)

// NamespaceName is name of namespace
type NamespaceName string

const (
// NamespaceWeightKey is the key in ResourceQuota.spec.hard indicating the weight of this namespace
NamespaceWeightKey = "volcano.sh/namespace.weight"
// DefaultNamespaceWeight is the default weight of namespace
DefaultNamespaceWeight = 1
)

// NamespaceInfo records information of namespace
type NamespaceInfo struct {
// Name is the name of this namespace
Name NamespaceName
// Weight is the highest weight among many ResourceQuota.
Weight int64

// QuotaStatus stores the ResourceQuotaStatus of all ResourceQuotas in this namespace
QuotaStatus map[string]v1.ResourceQuotaStatus
}

// GetWeight returns weight of a namespace, any invalid case would get default value
func (n *NamespaceInfo) GetWeight() int64 {
if n == nil || n.Weight == 0 {
return DefaultNamespaceWeight
}
return n.Weight
}

type quotaItem struct {
name string
weight int64
}

func quotaItemKeyFunc(obj interface{}) (string, error) {
item, ok := obj.(*quotaItem)
if !ok {
return "", fmt.Errorf("obj with type %T could not parse", obj)
}
return item.name, nil
}

// for big root heap
func quotaItemLessFunc(a interface{}, b interface{}) bool {
A := a.(*quotaItem)
B := b.(*quotaItem)
return A.weight > B.weight
}

// NamespaceCollection will record all details about namespace
type NamespaceCollection struct {
Name string

quotaWeight *cache.Heap

Name string
QuotaStatus map[string]v1.ResourceQuotaStatus
}

// NewNamespaceCollection creates new NamespaceCollection object to record all information about a namespace
func NewNamespaceCollection(name string) *NamespaceCollection {
n := &NamespaceCollection{
Name: name,
quotaWeight: cache.NewHeap(quotaItemKeyFunc, quotaItemLessFunc),
QuotaStatus: make(map[string]v1.ResourceQuotaStatus),
}
// add at least one item into quotaWeight.
// Because cache.Heap.Pop would be blocked until queue is not empty
n.updateWeight(&quotaItem{
name: NamespaceWeightKey,
weight: DefaultNamespaceWeight,
})
return n
}

func (n *NamespaceCollection) deleteWeight(q *quotaItem) {
n.quotaWeight.Delete(q)
}

func (n *NamespaceCollection) updateWeight(q *quotaItem) {
n.quotaWeight.Update(q)
}

func itemFromQuota(quota *v1.ResourceQuota) *quotaItem {
var weight int64 = DefaultNamespaceWeight

quotaWeight, ok := quota.Spec.Hard[NamespaceWeightKey]
if ok {
weight = quotaWeight.Value()
}

item := &quotaItem{
name: quota.Name,
weight: weight,
}
return item
}

// Update modify the registered information according quota object
func (n *NamespaceCollection) Update(quota *v1.ResourceQuota) {
n.updateWeight(itemFromQuota(quota))
n.QuotaStatus[quota.Name] = quota.Status
}

// Delete remove the registered information according quota object
func (n *NamespaceCollection) Delete(quota *v1.ResourceQuota) {
n.deleteWeight(itemFromQuota(quota))
delete(n.QuotaStatus, quota.Name)
}

// Snapshot will clone a NamespaceInfo without Heap according NamespaceCollection
func (n *NamespaceCollection) Snapshot() *NamespaceInfo {
var weight int64 = DefaultNamespaceWeight

obj, err := n.quotaWeight.Pop()
if err != nil {
klog.Warningf("namespace %s, quota weight meets error %v when pop", n.Name, err)
} else {
item := obj.(*quotaItem)
weight = item.weight
n.quotaWeight.Add(item)
}

return &NamespaceInfo{
Name: NamespaceName(n.Name),
Weight: weight,
QuotaStatus: n.QuotaStatus,
}
}
66 changes: 10 additions & 56 deletions pkg/scheduler/api/namespace_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func newQuota(name string, weight, req int) *v1.ResourceQuota {
func newQuota(name string, req int) *v1.ResourceQuota {
q := &v1.ResourceQuota{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -39,84 +39,38 @@ func newQuota(name string, weight, req int) *v1.ResourceQuota {
},
}

if weight >= 0 {
q.Spec.Hard[v1.ResourceName(NamespaceWeightKey)] = *resource.NewQuantity(int64(weight), resource.DecimalSI)
}

return q
}

func TestNamespaceCollection(t *testing.T) {
c := NewNamespaceCollection("testCollection")
c.Update(newQuota("abc", 123, 1))
c.Update(newQuota("abc", 1))

info := c.Snapshot()
req := info.QuotaStatus["abc"].Hard[v1.ResourceCPU]
if req.Value() != 1 {
t.Errorf("cpu request of quota %s should be %d, but got %d", "abc", 1, req.Value())
}

c.Update(newQuota("abc", 456, 2))
c.Update(newQuota("def", -1, 4))
c.Update(newQuota("def", 16, 8))
c.Update(newQuota("ghi", 0, 16))
c.Update(newQuota("abc", 2))
c.Update(newQuota("def", 4))
c.Update(newQuota("def", 8))
c.Update(newQuota("ghi", 16))

info = c.Snapshot()
if info.Weight != 456 {
t.Errorf("weight of namespace should be %d, but got %d", 456, info.Weight)
}
req = info.QuotaStatus["abc"].Hard[v1.ResourceCPU]
if req.Value() != 2 {
t.Errorf("cpu request of quota %s should be %d, but got %d", "abc", 2, req.Value())
}

c.Delete(newQuota("abc", 0, 0))
c.Delete(newQuota("abc", 0))

info = c.Snapshot()
if info.Weight != 16 {
t.Errorf("weight of namespace should be %d, but not %d", 16, info.Weight)
}
if _, ok := info.QuotaStatus["abc"]; ok {
t.Errorf("QuotaStatus abc of namespace should not exist")
}

c.Delete(newQuota("abc", 0, 0))
c.Delete(newQuota("def", 15, 0))
c.Delete(newQuota("ghi", -1, 0))

info = c.Snapshot()
if info.Weight != DefaultNamespaceWeight {
t.Errorf("weight of namespace should be default weight %d, but not %d", DefaultNamespaceWeight, info.Weight)
}
}

func TestEmptyNamespaceCollection(t *testing.T) {
c := NewNamespaceCollection("testEmptyCollection")

info := c.Snapshot()
if info.Weight != DefaultNamespaceWeight {
t.Errorf("weight of namespace should be %d, but not %d", DefaultNamespaceWeight, info.Weight)
}

// snapshot can be called anytime
info = c.Snapshot()
if info.Weight != DefaultNamespaceWeight {
t.Errorf("weight of namespace should be %d, but not %d", DefaultNamespaceWeight, info.Weight)
}

c.Delete(newQuota("abc", 0, 0))

info = c.Snapshot()
if info.Weight != DefaultNamespaceWeight {
t.Errorf("weight of namespace should be %d, but not %d", DefaultNamespaceWeight, info.Weight)
}

c.Delete(newQuota("abc", 0, 0))
c.Delete(newQuota("def", 15, 0))
c.Delete(newQuota("ghi", -1, 0))

info = c.Snapshot()
if info.Weight != DefaultNamespaceWeight {
t.Errorf("weight of namespace should be default weight %d, but not %d", DefaultNamespaceWeight, info.Weight)
}
c.Delete(newQuota("abc", 0))
c.Delete(newQuota("def", 0))
c.Delete(newQuota("ghi", 0))
}
Loading