Skip to content

Commit

Permalink
Refactor namespace fairshare function.
Browse files Browse the repository at this point in the history
Signed-off-by: jiangkaihua <jiangkaihua1@huawei.com>
  • Loading branch information
jiangkaihua committed Mar 29, 2023
1 parent 96deb7e commit 4ee689a
Show file tree
Hide file tree
Showing 14 changed files with 164 additions and 632 deletions.
84 changes: 22 additions & 62 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,16 @@ func (alloc *Action) Execute(ssn *framework.Session) {
defer klog.V(3).Infof("Leaving Allocate ...")

// the allocation for pod may have many stages
// 1. pick a namespace named N (using ssn.NamespaceOrderFn)
// 2. pick a queue named Q from N (using ssn.QueueOrderFn)
// 3. pick a job named J from Q (using ssn.JobOrderFn)
// 4. pick a task T from J (using ssn.TaskOrderFn)
// 5. use predicateFn to filter out node that T can not be allocated on.
// 6. use ssn.NodeOrderFn to judge the best node and assign it to T
// 1. pick a queue named Q (using ssn.QueueOrderFn)
// 2. pick a job named J from Q (using ssn.JobOrderFn)
// 3. pick a task T from J (using ssn.TaskOrderFn)
// 4. use predicateFn to filter out node that T can not be allocated on.
// 5. use ssn.NodeOrderFn to judge the best node and assign it to T

namespaces := util.NewPriorityQueue(ssn.NamespaceOrderFn)

// jobsMap is map[api.NamespaceName]map[api.QueueID]PriorityQueue(*api.JobInfo)
// used to find job with highest priority in given queue and namespace
jobsMap := map[api.NamespaceName]map[api.QueueID]*util.PriorityQueue{}
// queues sort queues by QueueOrderFn.
queues := util.NewPriorityQueue(ssn.QueueOrderFn)
// jobsMap is used to find job with the highest priority in given queue.
jobsMap := map[api.QueueID]*util.PriorityQueue{}

for _, job := range ssn.Jobs {
// If not config enqueue action, change Pending pg into Inqueue statue to avoid blocking job scheduling.
Expand All @@ -84,26 +82,16 @@ func (alloc *Action) Execute(ssn *framework.Session) {
continue
}

namespace := api.NamespaceName(job.Namespace)
queueMap, found := jobsMap[namespace]
if !found {
namespaces.Push(namespace)

queueMap = make(map[api.QueueID]*util.PriorityQueue)
jobsMap[namespace] = queueMap
}

jobs, found := queueMap[job.Queue]
if !found {
jobs = util.NewPriorityQueue(ssn.JobOrderFn)
queueMap[job.Queue] = jobs
if _, found := jobsMap[job.Queue]; !found {
jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
queues.Push(ssn.Queues[job.Queue])
}

klog.V(4).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue)
jobs.Push(job)
jobsMap[job.Queue].Push(job)
}

klog.V(3).Infof("Try to allocate resource to %d Namespaces", len(jobsMap))
klog.V(3).Infof("Try to allocate resource to %d Queues", len(jobsMap))

pendingTasks := map[api.JobID]*util.PriorityQueue{}

Expand All @@ -121,49 +109,21 @@ func (alloc *Action) Execute(ssn *framework.Session) {
// Because we believe that number of queues would less than namespaces in most case.
// And, this action would make the resource usage among namespace balanced.
for {
if namespaces.Empty() {
if queues.Empty() {
break
}

// pick namespace from namespaces PriorityQueue
namespace := namespaces.Pop().(api.NamespaceName)

queueInNamespace := jobsMap[namespace]

// pick queue for given namespace
//
// This block use an algorithm with time complex O(n).
// But at least PriorityQueue could not be used here,
// because the allocation of job would change the priority of queue among all namespaces,
// and the PriorityQueue have no ability to update priority for a special queue.
var queue *api.QueueInfo
for queueID := range queueInNamespace {
currentQueue := ssn.Queues[queueID]
if ssn.Overused(currentQueue) {
klog.V(3).Infof("Namespace <%s> Queue <%s> is overused, ignore it.", namespace, currentQueue.Name)
delete(queueInNamespace, queueID)
continue
}
if jobs, found := queueInNamespace[currentQueue.UID]; found && jobs.Empty() {
continue
}

if queue == nil || ssn.QueueOrderFn(currentQueue, queue) {
queue = currentQueue
}
}
queue := queues.Pop().(*api.QueueInfo)

if queue == nil {
klog.V(3).Infof("Namespace <%s> have no queue, skip it", namespace)
if ssn.Overused(queue) {
klog.V(3).Infof("Queue <%s> is overused, ignore it.", queue.Name)
continue
}

klog.V(3).Infof("Try to allocate resource to Jobs in Namespace <%s> Queue <%v>", namespace, queue.Name)
klog.V(3).Infof("Try to allocate resource to Jobs in Queue <%s>", queue.Name)

jobs, found := queueInNamespace[queue.UID]
jobs, found := jobsMap[queue.UID]
if !found || jobs.Empty() {
delete(queueInNamespace, queue.UID)
namespaces.Push(namespace)
klog.V(4).Infof("Can not find jobs for queue %s.", queue.Name)
continue
}
Expand All @@ -185,8 +145,8 @@ func (alloc *Action) Execute(ssn *framework.Session) {
}
tasks := pendingTasks[job.UID]

// Added Namespace back until no job in Namespace.
namespaces.Push(namespace)
// Added Queue back until no job in Namespace.
queues.Push(queue)

if tasks.Empty() {
continue
Expand Down
7 changes: 3 additions & 4 deletions pkg/scheduler/actions/allocate/allocate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,9 @@ func TestAllocate(t *testing.T) {
{
Plugins: []conf.PluginOption{
{
Name: "drf",
EnabledPreemptable: &trueValue,
EnabledJobOrder: &trueValue,
EnabledNamespaceOrder: &trueValue,
Name: "drf",
EnabledPreemptable: &trueValue,
EnabledJobOrder: &trueValue,
},
{
Name: "proportion",
Expand Down
3 changes: 1 addition & 2 deletions pkg/scheduler/api/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ func (ci ClusterInfo) String() string {
if len(ci.NamespaceInfo) != 0 {
str += "Namespaces:\n"
for _, ns := range ci.NamespaceInfo {
str += fmt.Sprintf("\t Namespace(%s) Weight(%v)\n",
ns.Name, ns.Weight)
str += fmt.Sprintf("\t Namespace(%s)\n", ns.Name)
}
}

Expand Down
91 changes: 1 addition & 90 deletions pkg/scheduler/api/namespace_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,138 +17,49 @@ limitations under the License.
package api

import (
"fmt"

v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
)

// NamespaceName is name of namespace
type NamespaceName string

const (
// NamespaceWeightKey is the key in ResourceQuota.spec.hard indicating the weight of this namespace
NamespaceWeightKey = "volcano.sh/namespace.weight"
// DefaultNamespaceWeight is the default weight of namespace
DefaultNamespaceWeight = 1
)

// NamespaceInfo records information of namespace
type NamespaceInfo struct {
// Name is the name of this namespace
Name NamespaceName
// Weight is the highest weight among many ResourceQuota.
Weight int64

// QuotaStatus stores the ResourceQuotaStatus of all ResourceQuotas in this namespace
QuotaStatus map[string]v1.ResourceQuotaStatus
}

// GetWeight returns weight of a namespace, any invalid case would get default value
func (n *NamespaceInfo) GetWeight() int64 {
if n == nil || n.Weight == 0 {
return DefaultNamespaceWeight
}
return n.Weight
}

type quotaItem struct {
name string
weight int64
}

func quotaItemKeyFunc(obj interface{}) (string, error) {
item, ok := obj.(*quotaItem)
if !ok {
return "", fmt.Errorf("obj with type %T could not parse", obj)
}
return item.name, nil
}

// for big root heap
func quotaItemLessFunc(a interface{}, b interface{}) bool {
A := a.(*quotaItem)
B := b.(*quotaItem)
return A.weight > B.weight
}

// NamespaceCollection will record all details about namespace
type NamespaceCollection struct {
Name string

quotaWeight *cache.Heap

Name string
QuotaStatus map[string]v1.ResourceQuotaStatus
}

// NewNamespaceCollection creates new NamespaceCollection object to record all information about a namespace
func NewNamespaceCollection(name string) *NamespaceCollection {
n := &NamespaceCollection{
Name: name,
quotaWeight: cache.NewHeap(quotaItemKeyFunc, quotaItemLessFunc),
QuotaStatus: make(map[string]v1.ResourceQuotaStatus),
}
// add at least one item into quotaWeight.
// Because cache.Heap.Pop would be blocked until queue is not empty
n.updateWeight(&quotaItem{
name: NamespaceWeightKey,
weight: DefaultNamespaceWeight,
})
return n
}

func (n *NamespaceCollection) deleteWeight(q *quotaItem) {
n.quotaWeight.Delete(q)
}

func (n *NamespaceCollection) updateWeight(q *quotaItem) {
n.quotaWeight.Update(q)
}

func itemFromQuota(quota *v1.ResourceQuota) *quotaItem {
var weight int64 = DefaultNamespaceWeight

quotaWeight, ok := quota.Spec.Hard[NamespaceWeightKey]
if ok {
weight = quotaWeight.Value()
}

item := &quotaItem{
name: quota.Name,
weight: weight,
}
return item
}

// Update modify the registered information according quota object
func (n *NamespaceCollection) Update(quota *v1.ResourceQuota) {
n.updateWeight(itemFromQuota(quota))
n.QuotaStatus[quota.Name] = quota.Status
}

// Delete remove the registered information according quota object
func (n *NamespaceCollection) Delete(quota *v1.ResourceQuota) {
n.deleteWeight(itemFromQuota(quota))
delete(n.QuotaStatus, quota.Name)
}

// Snapshot will clone a NamespaceInfo without Heap according NamespaceCollection
func (n *NamespaceCollection) Snapshot() *NamespaceInfo {
var weight int64 = DefaultNamespaceWeight

obj, err := n.quotaWeight.Pop()
if err != nil {
klog.Warningf("namespace %s, quota weight meets error %v when pop", n.Name, err)
} else {
item := obj.(*quotaItem)
weight = item.weight
n.quotaWeight.Add(item)
}

return &NamespaceInfo{
Name: NamespaceName(n.Name),
Weight: weight,
QuotaStatus: n.QuotaStatus,
}
}
66 changes: 10 additions & 56 deletions pkg/scheduler/api/namespace_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func newQuota(name string, weight, req int) *v1.ResourceQuota {
func newQuota(name string, req int) *v1.ResourceQuota {
q := &v1.ResourceQuota{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -39,84 +39,38 @@ func newQuota(name string, weight, req int) *v1.ResourceQuota {
},
}

if weight >= 0 {
q.Spec.Hard[v1.ResourceName(NamespaceWeightKey)] = *resource.NewQuantity(int64(weight), resource.DecimalSI)
}

return q
}

func TestNamespaceCollection(t *testing.T) {
c := NewNamespaceCollection("testCollection")
c.Update(newQuota("abc", 123, 1))
c.Update(newQuota("abc", 1))

info := c.Snapshot()
req := info.QuotaStatus["abc"].Hard[v1.ResourceCPU]
if req.Value() != 1 {
t.Errorf("cpu request of quota %s should be %d, but got %d", "abc", 1, req.Value())
}

c.Update(newQuota("abc", 456, 2))
c.Update(newQuota("def", -1, 4))
c.Update(newQuota("def", 16, 8))
c.Update(newQuota("ghi", 0, 16))
c.Update(newQuota("abc", 2))
c.Update(newQuota("def", 4))
c.Update(newQuota("def", 8))
c.Update(newQuota("ghi", 16))

info = c.Snapshot()
if info.Weight != 456 {
t.Errorf("weight of namespace should be %d, but got %d", 456, info.Weight)
}
req = info.QuotaStatus["abc"].Hard[v1.ResourceCPU]
if req.Value() != 2 {
t.Errorf("cpu request of quota %s should be %d, but got %d", "abc", 2, req.Value())
}

c.Delete(newQuota("abc", 0, 0))
c.Delete(newQuota("abc", 0))

info = c.Snapshot()
if info.Weight != 16 {
t.Errorf("weight of namespace should be %d, but not %d", 16, info.Weight)
}
if _, ok := info.QuotaStatus["abc"]; ok {
t.Errorf("QuotaStatus abc of namespace should not exist")
}

c.Delete(newQuota("abc", 0, 0))
c.Delete(newQuota("def", 15, 0))
c.Delete(newQuota("ghi", -1, 0))

info = c.Snapshot()
if info.Weight != DefaultNamespaceWeight {
t.Errorf("weight of namespace should be default weight %d, but not %d", DefaultNamespaceWeight, info.Weight)
}
}

func TestEmptyNamespaceCollection(t *testing.T) {
c := NewNamespaceCollection("testEmptyCollection")

info := c.Snapshot()
if info.Weight != DefaultNamespaceWeight {
t.Errorf("weight of namespace should be %d, but not %d", DefaultNamespaceWeight, info.Weight)
}

// snapshot can be called anytime
info = c.Snapshot()
if info.Weight != DefaultNamespaceWeight {
t.Errorf("weight of namespace should be %d, but not %d", DefaultNamespaceWeight, info.Weight)
}

c.Delete(newQuota("abc", 0, 0))

info = c.Snapshot()
if info.Weight != DefaultNamespaceWeight {
t.Errorf("weight of namespace should be %d, but not %d", DefaultNamespaceWeight, info.Weight)
}

c.Delete(newQuota("abc", 0, 0))
c.Delete(newQuota("def", 15, 0))
c.Delete(newQuota("ghi", -1, 0))

info = c.Snapshot()
if info.Weight != DefaultNamespaceWeight {
t.Errorf("weight of namespace should be default weight %d, but not %d", DefaultNamespaceWeight, info.Weight)
}
c.Delete(newQuota("abc", 0))
c.Delete(newQuota("def", 0))
c.Delete(newQuota("ghi", 0))
}
Loading

0 comments on commit 4ee689a

Please sign in to comment.