Skip to content

Commit

Permalink
Merge pull request #2580 from elinx/bugfix-prefilter
Browse files Browse the repository at this point in the history
move prefilter out of predicates to improve performance
  • Loading branch information
volcano-sh-bot committed Nov 29, 2022
2 parents 089a211 + 3686b90 commit 7157ada
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 33 deletions.
10 changes: 10 additions & 0 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,16 @@ func (alloc *Action) Execute(ssn *framework.Session) {

klog.V(3).Infof("There are <%d> nodes for Job <%v/%v>", len(ssn.Nodes), job.Namespace, job.Name)

if err := ssn.PrePredicateFn(task); err != nil {
klog.V(3).Infof("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err)
fitErrors := api.NewFitErrors()
for _, ni := range allNodes {
fitErrors.SetNodeError(ni.Name, err)
}
job.NodesFitErrors[task.UID] = fitErrors
break
}

predicateNodes, fitErrors := ph.PredicateNodes(task, allNodes, predicateFn)
if len(predicateNodes) == 0 {
job.NodesFitErrors[task.UID] = fitErrors
Expand Down
9 changes: 9 additions & 0 deletions pkg/scheduler/actions/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ func (backfill *Action) Execute(ssn *framework.Session) {
allocated := false
fe := api.NewFitErrors()

if err := ssn.PrePredicateFn(task); err != nil {
klog.V(3).Infof("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err)
for _, ni := range ssn.Nodes {
fe.SetNodeError(ni.Name, err)
}
job.NodesFitErrors[task.UID] = fe
break
}

// As task did not request resources, so it only need to meet predicates.
// TODO (k82cn): need to prioritize nodes to avoid pod hole.
for _, node := range ssn.Nodes {
Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ func preempt(

allNodes := ssn.NodeList

if err := ssn.PrePredicateFn(preemptor); err != nil {
return false, fmt.Errorf("PrePredicate for task %s/%s failed for: %v", preemptor.Namespace, preemptor.Name, err)
}
predicateNodes, _ := predicateHelper.PredicateNodes(preemptor, allNodes, ssn.PredicateFn)

nodeScores := util.PrioritizeNodes(preemptor, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)
Expand Down
5 changes: 5 additions & 0 deletions pkg/scheduler/actions/reclaim/reclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ func (ra *Action) Execute(ssn *framework.Session) {
continue
}

if err := ssn.PrePredicateFn(task); err != nil {
klog.V(3).Infof("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err)
continue
}

assigned := false
for _, n := range ssn.Nodes {
// If predicates failed, next node.
Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ type JobEnqueuedFn func(interface{})
// PredicateFn is the func declaration used to predicate node for task.
type PredicateFn func(*TaskInfo, *NodeInfo) error

// PrePredicateFn is the func declaration used to pre-predicate node for task.
type PrePredicateFn func(*TaskInfo) error

// BestNodeFn is the func declaration used to return the nodeScores to plugins.
type BestNodeFn func(*TaskInfo, map[float64][]*NodeInfo) *NodeInfo

Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type Session struct {
namespaceOrderFns map[string]api.CompareFn
clusterOrderFns map[string]api.CompareFn
predicateFns map[string]api.PredicateFn
prePredicateFns map[string]api.PrePredicateFn
bestNodeFns map[string]api.BestNodeFn
nodeOrderFns map[string]api.NodeOrderFn
batchNodeOrderFns map[string]api.BatchNodeOrderFn
Expand Down Expand Up @@ -118,6 +119,7 @@ func openSession(cache cache.Cache) *Session {
namespaceOrderFns: map[string]api.CompareFn{},
clusterOrderFns: map[string]api.CompareFn{},
predicateFns: map[string]api.PredicateFn{},
prePredicateFns: map[string]api.PrePredicateFn{},
bestNodeFns: map[string]api.BestNodeFn{},
nodeOrderFns: map[string]api.NodeOrderFn{},
batchNodeOrderFns: map[string]api.BatchNodeOrderFn{},
Expand Down
26 changes: 26 additions & 0 deletions pkg/scheduler/framework/session_plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ func (ssn *Session) AddPredicateFn(name string, pf api.PredicateFn) {
ssn.predicateFns[name] = pf
}

// AddPredicateFn add Predicate function
func (ssn *Session) AddPrePredicateFn(name string, pf api.PrePredicateFn) {
ssn.prePredicateFns[name] = pf
}

// AddBestNodeFn add BestNode function
func (ssn *Session) AddBestNodeFn(name string, pf api.BestNodeFn) {
ssn.bestNodeFns[name] = pf
Expand Down Expand Up @@ -639,6 +644,27 @@ func (ssn *Session) PredicateFn(task *api.TaskInfo, node *api.NodeInfo) error {
return nil
}

// PrePredicateFn invoke predicate function of the plugins
func (ssn *Session) PrePredicateFn(task *api.TaskInfo) error {
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
// we use same option as predicates for they are
if !isEnabled(plugin.EnabledPredicate) {
continue
}
pfn, found := ssn.prePredicateFns[plugin.Name]
if !found {
continue
}
err := pfn(task)
if err != nil {
return err
}
}
}
return nil
}

// BestNodeFn invoke bestNode function of the plugins
func (ssn *Session) BestNodeFn(task *api.TaskInfo, nodeScores map[float64][]*api.NodeInfo) *api.NodeInfo {
for _, tier := range ssn.Tiers {
Expand Down
72 changes: 39 additions & 33 deletions pkg/scheduler/plugins/predicates/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,44 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {
plugin, _ = podtopologyspread.New(ptsArgs, handle, features)
podTopologySpreadFilter := plugin.(*podtopologyspread.PodTopologySpread)

state := k8sframework.NewCycleState()

ssn.AddPrePredicateFn(pp.Name(), func(task *api.TaskInfo) error {
// Check NodePorts
nodePortFilter.PreFilter(context.TODO(), state, task.Pod)

// InterPodAffinity Predicate
// TODO: Update the node information to be processed by the filer based on the node list returned by the prefilter.
// In K8S V1.25, the return value result is added to the Prefile interface,
// indicating the list of nodes that meet filtering conditions.
// If the value of result is nil, all nodes meet the conditions.
// If the specified node information exists, only the node information in result meets the conditions.
// The value of Prefile in the current InterPodAffinity package always returns nil.
// The outer layer does not need to be processed temporarily.
// If the filtering logic is added to the Prefile node in the Volumebinding package in the future,
// the processing logic needs to be added to the return value result.
_, status := podAffinityFilter.PreFilter(context.TODO(), state, task.Pod)
if !status.IsSuccess() {
return fmt.Errorf("plugin %s pre-predicates failed %s", interpodaffinity.Name, status.Message())
}

// Check PodTopologySpread
// TODO: Update the node information to be processed by the filer based on the node list returned by the prefilter.
// In K8S V1.25, the return value result is added to the Prefile interface,
// indicating the list of nodes that meet filtering conditions.
// If the value of result is nil, all nodes meet the conditions.
// If the specified node information exists, only the node information in result meets the conditions.
// The value of Prefile in the current PodTopologySpread package always returns nil.
// The outer layer does not need to be processed temporarily.
// If the filtering logic is added to the Prefile node in the Volumebinding package in the future,
// the processing logic needs to be added to the return value result.
_, status = podTopologySpreadFilter.PreFilter(context.TODO(), state, task.Pod)
if !status.IsSuccess() {
return fmt.Errorf("plugin %s pre-predicates failed %s", podTopologySpreadFilter.Name(), status.Message())
}
return nil
})

ssn.AddPredicateFn(pp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) error {
nodeInfo, found := nodeMap[node.Name]
if !found {
Expand All @@ -358,7 +396,6 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {
return api.NewFitError(task, node, api.NodePodNumberExceeded)
}

state := k8sframework.NewCycleState()
predicateByStablefilter := func(pod *v1.Pod, nodeInfo *k8sframework.NodeInfo) (bool, error) {
// CheckNodeUnschedulable
status := nodeUnscheduleFilter.Filter(context.TODO(), state, task.Pod, nodeInfo)
Expand Down Expand Up @@ -402,26 +439,9 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {
return err
}

// Check NodePorts
nodePortFilter.PreFilter(context.TODO(), state, task.Pod)
status := nodePortFilter.Filter(context.TODO(), state, nil, nodeInfo)
if !status.IsSuccess() {
return fmt.Errorf("plugin %s predicates failed %s", nodeaffinity.Name, status.Message())
}

// InterPodAffinity Predicate
// TODO: Update the node information to be processed by the filer based on the node list returned by the prefilter.
// In K8S V1.25, the return value result is added to the Prefile interface,
// indicating the list of nodes that meet filtering conditions.
// If the value of result is nil, all nodes meet the conditions.
// If the specified node information exists, only the node information in result meets the conditions.
// The value of Prefile in the current InterPodAffinity package always returns nil.
// The outer layer does not need to be processed temporarily.
// If the filtering logic is added to the Prefile node in the Volumebinding package in the future,
// the processing logic needs to be added to the return value result.
_, status = podAffinityFilter.PreFilter(context.TODO(), state, task.Pod)
if !status.IsSuccess() {
return fmt.Errorf("plugin %s pre-predicates failed %s", interpodaffinity.Name, status.Message())
return fmt.Errorf("plugin %s predicates failed %s", nodeports.Name, status.Message())
}

status = podAffinityFilter.Filter(context.TODO(), state, task.Pod, nodeInfo)
Expand All @@ -441,20 +461,6 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {
return fmt.Errorf("plugin %s predicates failed %s", volumeZoneFilter.Name(), status.Message())
}

// Check PodTopologySpread
// TODO: Update the node information to be processed by the filer based on the node list returned by the prefilter.
// In K8S V1.25, the return value result is added to the Prefile interface,
// indicating the list of nodes that meet filtering conditions.
// If the value of result is nil, all nodes meet the conditions.
// If the specified node information exists, only the node information in result meets the conditions.
// The value of Prefile in the current PodTopologySpread package always returns nil.
// The outer layer does not need to be processed temporarily.
// If the filtering logic is added to the Prefile node in the Volumebinding package in the future,
// the processing logic needs to be added to the return value result.
_, status = podTopologySpreadFilter.PreFilter(context.TODO(), state, task.Pod)
if !status.IsSuccess() {
return fmt.Errorf("plugin %s pre-predicates failed %s", podTopologySpreadFilter.Name(), status.Message())
}
status = podTopologySpreadFilter.Filter(context.TODO(), state, task.Pod, nodeInfo)
if !status.IsSuccess() {
return fmt.Errorf("plugin %s predicates failed %s", podTopologySpreadFilter.Name(), status.Message())
Expand Down

0 comments on commit 7157ada

Please sign in to comment.