Skip to content

Commit

Permalink
sch gates commit
Browse files Browse the repository at this point in the history
Signed-off-by: ykcai-daniel <1155141377@link.cuhk.edu.hk>
  • Loading branch information
ykcai-daniel committed Sep 6, 2024
1 parent fada548 commit 64b16e1
Show file tree
Hide file tree
Showing 17 changed files with 400 additions and 37 deletions.
6 changes: 6 additions & 0 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ func (alloc *Action) allocateResources(queues *util.PriorityQueue, jobsMap map[a
if _, found = pendingTasks[job.UID]; !found {
tasks := util.NewPriorityQueue(ssn.TaskOrderFn)
for _, task := range job.TaskStatusIndex[api.Pending] {

// Skip tasks whose pod are scheduling gated
if task.SchedulingGated() {
continue
}

// Skip BestEffort task in 'allocate' action.
if task.Resreq.IsEmpty() {
klog.V(4).Infof("Task <%v/%v> is BestEffort task, skip it.",
Expand Down
5 changes: 5 additions & 0 deletions pkg/scheduler/actions/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ func (backfill *Action) pickUpPendingTasks(ssn *framework.Session) []*api.TaskIn
if !task.BestEffort {
continue
}

if task.SchedulingGated() {
continue
}

if _, existed := tasks[job.UID]; !existed {
tasks[job.UID] = util.NewPriorityQueue(ssn.TaskOrderFn)
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ func (pmpt *Action) Execute(ssn *framework.Session) {
underRequest = append(underRequest, job)
preemptorTasks[job.UID] = util.NewPriorityQueue(ssn.TaskOrderFn)
for _, task := range job.TaskStatusIndex[api.Pending] {
if task.SchedulingGated() {
continue
}
preemptorTasks[job.UID].Push(task)
}
}
Expand Down Expand Up @@ -155,6 +158,10 @@ func (pmpt *Action) Execute(ssn *framework.Session) {
// Fix: preemptor numbers lose when in same job
preemptorTasks[job.UID] = util.NewPriorityQueue(ssn.TaskOrderFn)
for _, task := range job.TaskStatusIndex[api.Pending] {
// Again, skip scheduling gated tasks
if task.SchedulingGated() {
continue
}
preemptorTasks[job.UID].Push(task)
}
for {
Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/actions/reclaim/reclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ func (ra *Action) Execute(ssn *framework.Session) {
preemptorsMap[job.Queue].Push(job)
preemptorTasks[job.UID] = util.NewPriorityQueue(ssn.TaskOrderFn)
for _, task := range job.TaskStatusIndex[api.Pending] {
if task.SchedulingGated() {
continue
}
preemptorTasks[job.UID].Push(task)
}
}
Expand Down
64 changes: 62 additions & 2 deletions pkg/scheduler/api/job_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"

"k8s.io/kubernetes/pkg/features"

batch "volcano.sh/apis/pkg/apis/batch/v1alpha1"
"volcano.sh/apis/pkg/apis/scheduling"
"volcano.sh/apis/pkg/apis/scheduling/v1beta1"

volumescheduling "volcano.sh/volcano/pkg/scheduler/capabilities/volumebinding"
)

Expand Down Expand Up @@ -124,6 +126,7 @@ type TaskInfo struct {
Preemptable bool
BestEffort bool
HasRestartableInitContainer bool
schGated bool

// RevocableZone supports setting volcano.sh/revocable-zone annotation or label for pod/podgroup
// we only support empty value or * value for this version and we will support specify revocable zone name for future releases
Expand Down Expand Up @@ -178,7 +181,8 @@ func NewTaskInfo(pod *v1.Pod) *TaskInfo {
topologyInfo := GetPodTopologyInfo(pod)
role := getTaskRole(pod)
hasRestartableInitContainer := hasRestartableInitContainer(pod)

// initialize pod scheduling gates info here since it will not change in a scheduling cycle
schGated := calSchedulingGated(pod)
jobID := getJobID(pod)

ti := &TaskInfo{
Expand All @@ -196,6 +200,7 @@ func NewTaskInfo(pod *v1.Pod) *TaskInfo {
HasRestartableInitContainer: hasRestartableInitContainer,
RevocableZone: revocableZone,
NumaInfo: topologyInfo,
schGated: schGated,
TransactionContext: TransactionContext{
NodeName: pod.Spec.NodeName,
Status: getTaskStatus(pod),
Expand Down Expand Up @@ -231,6 +236,22 @@ func (ti *TaskInfo) ClearLastTxContext() {
ti.LastTransaction = nil
}

// Return if the pod of a task is scheduling gated by checking if length of sch gates is zero
// When the Pod is not yet created or sch gates field not set, return false
func calSchedulingGated(pod *v1.Pod) bool {
// Only enable if features.PodSchedulingReadiness feature gate is enabled
if utilfeature.DefaultFeatureGate.Enabled(features.PodSchedulingReadiness) {
return pod != nil && pod.Spec.SchedulingGates != nil && len(pod.Spec.SchedulingGates) != 0
}
return false

}

// ti.SchGated is initialized in NewTaskInfo
func (ti *TaskInfo) SchedulingGated() bool {
return ti.schGated
}

func (ti *TaskInfo) SetPodResourceDecision() error {
if ti.NumaInfo == nil || len(ti.NumaInfo.ResMap) == 0 {
return nil
Expand Down Expand Up @@ -273,6 +294,7 @@ func (ti *TaskInfo) Clone() *TaskInfo {
HasRestartableInitContainer: ti.HasRestartableInitContainer,
RevocableZone: ti.RevocableZone,
NumaInfo: ti.NumaInfo.Clone(),
schGated: ti.schGated,
TransactionContext: TransactionContext{
NodeName: ti.NodeName,
Status: ti.Status,
Expand Down Expand Up @@ -518,6 +540,44 @@ func (ji *JobInfo) GetMinResources() *Resource {
return NewResource(*ji.PodGroup.Spec.MinResources)
}

// Get the total resources of tasks whose pod is scheduling gated
// By definition, if a pod is scheduling gated, it's status is Pending
func (ji *JobInfo) GetSchGatedPodResources() *Resource {
res := EmptyResource()
for _, task := range ji.Tasks {
if task.SchedulingGated() {
res.Add(task.Resreq)
}
}
return res
}

// DeductSchGatedResources deduct resources of scheduling gated pod from Resource res;
// If resource is less than gated resources, return zero;
// Note: The purpose of this functionis to deduct the resources of scheduling gated tasks
// in a job when calculating inqueued resources so that it will not block other jobs from being inqueued.
func (ji *JobInfo) DeductSchGatedResources(res *Resource) *Resource {
schGatedResource := ji.GetSchGatedPodResources()
// Most jobs do not have any scheduling gated tasks, hence we add this short cut
if schGatedResource.IsEmpty() {
return res
}

result := res.Clone()
// schGatedResource can be larger than MinResource because minAvailable of a job can be smaller than number of replica
result.MilliCPU = max(result.MilliCPU-schGatedResource.MilliCPU, 0)
result.Memory = max(result.Memory-schGatedResource.Memory, 0)

// If a scalar resource is present in schGatedResource but not in minResource, skip it
for name, resource := range res.ScalarResources {
if schGatedRes, ok := schGatedResource.ScalarResources[name]; ok {
result.ScalarResources[name] = max(resource-schGatedRes, 0)
}
}
klog.V(3).Infof("Gated resources: %s, MinResource: %s: Result: %s", schGatedResource.String(), res.String(), result.String())
return result
}

func (ji *JobInfo) GetElasticResources() *Resource {
minResource := ji.GetMinResources()
if ji.Allocated.LessEqualPartly(minResource, Zero) {
Expand Down
15 changes: 0 additions & 15 deletions pkg/scheduler/api/resource_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,21 +818,6 @@ func TestLessEqualWithDimension(t *testing.T) {
},
expected: true,
},
{
resource1: &Resource{
MilliCPU: 110, Memory: 4000,
ScalarResources: map[v1.ResourceName]float64{"nvidia.com/gpu": 1, "nvidia.com/A100": 1, "scalar": 1},
},
resource2: &Resource{
MilliCPU: 100, Memory: 8000,
ScalarResources: map[v1.ResourceName]float64{"nvidia.com/A100": 1, "scalar": 1},
},
req: &Resource{
Memory: 4000,
ScalarResources: map[v1.ResourceName]float64{"nvidia.com/gpu": 0, "nvidia.com/A100": 1, "scalar": 1},
},
expected: true,
},
}

for i, test := range tests {
Expand Down
8 changes: 8 additions & 0 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1443,6 +1443,7 @@ func (sc *SchedulerCache) RecordJobStatusEvent(job *schedulingapi.JobInfo, updat
len(job.TaskStatusIndex[schedulingapi.Pending]),
len(job.Tasks),
fitErrStr)
// TODO: should we skip pod unschedulable event if pod group is unschedulable due to gates to avoid printing too many messages?
sc.recordPodGroupEvent(job.PodGroup, v1.EventTypeWarning, string(scheduling.PodGroupUnschedulableType), msg)
} else if updatePG {
sc.recordPodGroupEvent(job.PodGroup, v1.EventTypeNormal, string(scheduling.PodGroupScheduled), string(scheduling.PodGroupReady))
Expand All @@ -1455,6 +1456,13 @@ func (sc *SchedulerCache) RecordJobStatusEvent(job *schedulingapi.JobInfo, updat
// Update podCondition for tasks Allocated and Pending before job discarded
for _, status := range []schedulingapi.TaskStatus{schedulingapi.Allocated, schedulingapi.Pending, schedulingapi.Pipelined} {
for _, taskInfo := range job.TaskStatusIndex[status] {

// The pod of a scheduling gated task is given
// the ScheduleGated condition by the api-server. Do not change it.
if taskInfo.SchedulingGated() {
continue
}

reason, msg, nominatedNodeName := job.TaskSchedulingReason(taskInfo.UID)
if len(msg) == 0 {
msg = baseErrorMessage
Expand Down
14 changes: 8 additions & 6 deletions pkg/scheduler/plugins/capacity/capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ func (cp *capacityPlugin) OnSessionOpen(ssn *framework.Session) {
}

if job.PodGroup.Status.Phase == scheduling.PodGroupInqueue {
attr.inqueue.Add(job.GetMinResources())
// deduct the resources of scheduling gated tasks in a job when calculating inqueued resources
// so that it will not block other jobs from being inqueued.
attr.inqueue.Add(job.DeductSchGatedResources(job.GetMinResources()))
}

// calculate inqueue resource for running jobs
Expand All @@ -152,7 +154,7 @@ func (cp *capacityPlugin) OnSessionOpen(ssn *framework.Session) {
job.PodGroup.Spec.MinResources != nil &&
int32(util.CalculateAllocatedTaskNum(job)) >= job.PodGroup.Spec.MinMember {
inqueued := util.GetInqueueResource(job, job.Allocated)
attr.inqueue.Add(inqueued)
attr.inqueue.Add(job.DeductSchGatedResources(inqueued))
}
attr.elastic.Add(job.GetElasticResources())
klog.V(5).Infof("Queue %s allocated <%s> request <%s> inqueue <%s> elastic <%s>",
Expand Down Expand Up @@ -293,12 +295,12 @@ func (cp *capacityPlugin) OnSessionOpen(ssn *framework.Session) {
}

if job.PodGroup.Spec.MinResources == nil {
klog.V(4).Infof("job %s MinResources is null.", job.Name)
klog.V(3).Infof("job %s MinResources is null.", job.Name)
return util.Permit
}
minReq := job.GetMinResources()

klog.V(5).Infof("job %s min resource <%s>, queue %s capability <%s> allocated <%s> inqueue <%s> elastic <%s>",
klog.V(3).Infof("job %s min resource <%s>, queue %s capability <%s> allocated <%s> inqueue <%s> elastic <%s>",
job.Name, minReq.String(), queue.Name, attr.realCapability.String(), attr.allocated.String(), attr.inqueue.String(), attr.elastic.String())
// The queue resource quota limit has not reached
r := minReq.Add(attr.allocated).Add(attr.inqueue).Sub(attr.elastic)
Expand All @@ -311,9 +313,9 @@ func (cp *capacityPlugin) OnSessionOpen(ssn *framework.Session) {
}

inqueue := r.LessEqual(rr, api.Infinity)
klog.V(5).Infof("job %s inqueue %v", job.Name, inqueue)
klog.V(3).Infof("job %s inqueue %v", job.Name, inqueue)
if inqueue {
attr.inqueue.Add(job.GetMinResources())
attr.inqueue.Add(job.DeductSchGatedResources(job.GetMinResources()))
return util.Permit
}
ssn.RecordPodGroupEvent(job.PodGroup, v1.EventTypeNormal, string(scheduling.PodGroupUnschedulableType), "queue resource quota insufficient")
Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/plugins/gang/gang.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ func (gp *gangPlugin) OnSessionClose(ssn *framework.Session) {
unScheduleJobCount++
metrics.RegisterJobRetries(job.Name)

// TODO: If the Job is gang-unschedulable due to scheduling gates
// we need a new message and reason to tell users
// More detail in design doc pod-scheduling-readiness.md
jc := &scheduling.PodGroupCondition{
Type: scheduling.PodGroupUnschedulableType,
Status: v1.ConditionTrue,
Expand Down
14 changes: 8 additions & 6 deletions pkg/scheduler/plugins/overcommit/overcommit.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ func (op *overcommitPlugin) OnSessionOpen(ssn *framework.Session) {
for _, job := range ssn.Jobs {
// calculate inqueue job resources
if job.PodGroup.Status.Phase == scheduling.PodGroupInqueue && job.PodGroup.Spec.MinResources != nil {
op.inqueueResource.Add(api.NewResource(*job.PodGroup.Spec.MinResources))
// deduct the resources of scheduling gated tasks in a job when calculating inqueued resources
// so that it will not block other jobs from being inqueued.
op.inqueueResource.Add(job.DeductSchGatedResources(job.GetMinResources()))
continue
}
// calculate inqueue resource for running jobs
Expand All @@ -103,7 +105,7 @@ func (op *overcommitPlugin) OnSessionOpen(ssn *framework.Session) {
job.PodGroup.Spec.MinResources != nil &&
int32(util.CalculateAllocatedTaskNum(job)) >= job.PodGroup.Spec.MinMember {
inqueued := util.GetInqueueResource(job, job.Allocated)
op.inqueueResource.Add(inqueued)
op.inqueueResource.Add(job.DeductSchGatedResources(inqueued))
}
}

Expand All @@ -113,12 +115,12 @@ func (op *overcommitPlugin) OnSessionOpen(ssn *framework.Session) {
inqueue := api.EmptyResource()
inqueue.Add(op.inqueueResource)
if job.PodGroup.Spec.MinResources == nil {
klog.V(4).Infof("Job <%s/%s> is bestEffort, permit to be inqueue.", job.Namespace, job.Name)
klog.V(3).Infof("Job <%s/%s> is bestEffort, permit to be inqueue.", job.Namespace, job.Name)
return util.Permit
}

//TODO: if allow 1 more job to be inqueue beyond overcommit-factor, large job may be inqueue and create pods
jobMinReq := api.NewResource(*job.PodGroup.Spec.MinResources)
jobMinReq := job.GetMinResources()
if inqueue.Add(jobMinReq).LessEqualWithDimension(idle, jobMinReq) { // only compare the requested resource
klog.V(4).Infof("Sufficient resources, permit job <%s/%s> to be inqueue", job.Namespace, job.Name)
return util.Permit
Expand All @@ -134,8 +136,8 @@ func (op *overcommitPlugin) OnSessionOpen(ssn *framework.Session) {
if job.PodGroup.Spec.MinResources == nil {
return
}
jobMinReq := api.NewResource(*job.PodGroup.Spec.MinResources)
op.inqueueResource.Add(jobMinReq)
jobMinReq := job.GetMinResources()
op.inqueueResource.Add(job.DeductSchGatedResources(jobMinReq))
})
}

Expand Down
17 changes: 10 additions & 7 deletions pkg/scheduler/plugins/proportion/proportion.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {
}

if job.PodGroup.Status.Phase == scheduling.PodGroupInqueue {
attr.inqueue.Add(job.GetMinResources())
attr.inqueue.Add(job.DeductSchGatedResources(job.GetMinResources()))
}

// calculate inqueue resource for running jobs
Expand All @@ -153,10 +153,11 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {
job.PodGroup.Spec.MinResources != nil &&
int32(util.CalculateAllocatedTaskNum(job)) >= job.PodGroup.Spec.MinMember {
inqueued := util.GetInqueueResource(job, job.Allocated)
attr.inqueue.Add(inqueued)
// deduct scheduling gated tasks from inqueue resources
attr.inqueue.Add(job.DeductSchGatedResources(inqueued))
}
attr.elastic.Add(job.GetElasticResources())
klog.V(5).Infof("Queue %s allocated <%s> request <%s> inqueue <%s> elastic <%s>",
klog.V(3).Infof("Queue %s allocated <%s> request <%s> inqueue <%s> elastic <%s>",
attr.name, attr.allocated.String(), attr.request.String(), attr.inqueue.String(), attr.elastic.String())
}

Expand Down Expand Up @@ -331,7 +332,7 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {
queue := ssn.Queues[queueID]
// If no capability is set, always enqueue the job.
if attr.realCapability == nil {
klog.V(4).Infof("Capability of queue <%s> was not set, allow job <%s/%s> to Inqueue.",
klog.V(3).Infof("Capability of queue <%s> was not set, allow job <%s/%s> to Inqueue.",
queue.Name, job.Namespace, job.Name)
return util.Permit
}
Expand All @@ -342,7 +343,7 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {
}
minReq := job.GetMinResources()

klog.V(5).Infof("job %s min resource <%s>, queue %s capability <%s> allocated <%s> inqueue <%s> elastic <%s>",
klog.V(3).Infof("job %s min resource <%s>, queue %s capability <%s> allocated <%s> inqueue <%s> elastic <%s>",
job.Name, minReq.String(), queue.Name, attr.realCapability.String(), attr.allocated.String(), attr.inqueue.String(), attr.elastic.String())
// The queue resource quota limit has not reached
r := minReq.Add(attr.allocated).Add(attr.inqueue).Sub(attr.elastic)
Expand All @@ -355,9 +356,11 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {
}

inqueue := r.LessEqual(rr, api.Infinity)
klog.V(5).Infof("job %s inqueue %v", job.Name, inqueue)
klog.V(3).Infof("job %s inqueue %v", job.Name, inqueue)
if inqueue {
attr.inqueue.Add(job.GetMinResources())
// deduct the resources of scheduling gated tasks in a job when calculating inqueued resources
// so that it will not block other jobs from being inqueued.
attr.inqueue.Add(job.DeductSchGatedResources(minReq))
return util.Permit
}
ssn.RecordPodGroupEvent(job.PodGroup, v1.EventTypeNormal, string(scheduling.PodGroupUnschedulableType), "queue resource quota insufficient")
Expand Down
4 changes: 4 additions & 0 deletions pkg/webhooks/admission/jobs/validate/admit_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,10 @@ func validateJobUpdate(old, new *v1alpha1.Job) error {
// other fields under spec are not allowed to mutate
new.Spec.MinAvailable = old.Spec.MinAvailable
new.Spec.PriorityClassName = old.Spec.PriorityClassName

// K8S also permit mutating spec.schedulingGates
// We do not support this for vcjob (More details in design doc pod-scheduling-readiness.md)

for i := range new.Spec.Tasks {
new.Spec.Tasks[i].Replicas = old.Spec.Tasks[i].Replicas
new.Spec.Tasks[i].MinAvailable = old.Spec.Tasks[i].MinAvailable
Expand Down
Loading

0 comments on commit 64b16e1

Please sign in to comment.