diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index 17b47b678c..d2806268df 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -198,6 +198,16 @@ func (alloc *Action) Execute(ssn *framework.Session) { klog.V(3).Infof("There are <%d> nodes for Job <%v/%v>", len(ssn.Nodes), job.Namespace, job.Name) + if err := ssn.PrePredicateFn(task); err != nil { + klog.V(3).Infof("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err) + fitErrors := api.NewFitErrors() + for _, ni := range allNodes { + fitErrors.SetNodeError(ni.Name, err) + } + job.NodesFitErrors[task.UID] = fitErrors + break + } + predicateNodes, fitErrors := ph.PredicateNodes(task, allNodes, predicateFn) if len(predicateNodes) == 0 { job.NodesFitErrors[task.UID] = fitErrors diff --git a/pkg/scheduler/actions/backfill/backfill.go b/pkg/scheduler/actions/backfill/backfill.go index eded71e03b..c8f0364246 100644 --- a/pkg/scheduler/actions/backfill/backfill.go +++ b/pkg/scheduler/actions/backfill/backfill.go @@ -58,6 +58,15 @@ func (backfill *Action) Execute(ssn *framework.Session) { allocated := false fe := api.NewFitErrors() + if err := ssn.PrePredicateFn(task); err != nil { + klog.V(3).Infof("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err) + for _, ni := range ssn.Nodes { + fe.SetNodeError(ni.Name, err) + } + job.NodesFitErrors[task.UID] = fe + break + } + // As task did not request resources, so it only need to meet predicates. // TODO (k82cn): need to prioritize nodes to avoid pod hole. for _, node := range ssn.Nodes { diff --git a/pkg/scheduler/actions/preempt/preempt.go b/pkg/scheduler/actions/preempt/preempt.go index 1f5f21340b..81256a5071 100644 --- a/pkg/scheduler/actions/preempt/preempt.go +++ b/pkg/scheduler/actions/preempt/preempt.go @@ -206,6 +206,9 @@ func preempt( allNodes := ssn.NodeList + if err := ssn.PrePredicateFn(preemptor); err != nil { + return false, fmt.Errorf("PrePredicate for task %s/%s failed for: %v", preemptor.Namespace, preemptor.Name, err) + } predicateNodes, _ := predicateHelper.PredicateNodes(preemptor, allNodes, ssn.PredicateFn) nodeScores := util.PrioritizeNodes(preemptor, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn) diff --git a/pkg/scheduler/actions/reclaim/reclaim.go b/pkg/scheduler/actions/reclaim/reclaim.go index ef9df4dfad..8475504496 100644 --- a/pkg/scheduler/actions/reclaim/reclaim.go +++ b/pkg/scheduler/actions/reclaim/reclaim.go @@ -116,6 +116,11 @@ func (ra *Action) Execute(ssn *framework.Session) { continue } + if err := ssn.PrePredicateFn(task); err != nil { + klog.V(3).Infof("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err) + continue + } + assigned := false for _, n := range ssn.Nodes { // If predicates failed, next node. diff --git a/pkg/scheduler/api/types.go b/pkg/scheduler/api/types.go index d0434912f9..699cf8a918 100644 --- a/pkg/scheduler/api/types.go +++ b/pkg/scheduler/api/types.go @@ -136,6 +136,9 @@ type JobEnqueuedFn func(interface{}) // PredicateFn is the func declaration used to predicate node for task. type PredicateFn func(*TaskInfo, *NodeInfo) error +// PrePredicateFn is the func declaration used to pre-predicate node for task. +type PrePredicateFn func(*TaskInfo) error + // BestNodeFn is the func declaration used to return the nodeScores to plugins. type BestNodeFn func(*TaskInfo, map[float64][]*NodeInfo) *NodeInfo diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index 5e884293ed..965fd64f5f 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -73,6 +73,7 @@ type Session struct { namespaceOrderFns map[string]api.CompareFn clusterOrderFns map[string]api.CompareFn predicateFns map[string]api.PredicateFn + prePredicateFns map[string]api.PrePredicateFn bestNodeFns map[string]api.BestNodeFn nodeOrderFns map[string]api.NodeOrderFn batchNodeOrderFns map[string]api.BatchNodeOrderFn @@ -118,6 +119,7 @@ func openSession(cache cache.Cache) *Session { namespaceOrderFns: map[string]api.CompareFn{}, clusterOrderFns: map[string]api.CompareFn{}, predicateFns: map[string]api.PredicateFn{}, + prePredicateFns: map[string]api.PrePredicateFn{}, bestNodeFns: map[string]api.BestNodeFn{}, nodeOrderFns: map[string]api.NodeOrderFn{}, batchNodeOrderFns: map[string]api.BatchNodeOrderFn{}, diff --git a/pkg/scheduler/framework/session_plugins.go b/pkg/scheduler/framework/session_plugins.go index 5f5dd830e9..6366d5aac8 100644 --- a/pkg/scheduler/framework/session_plugins.go +++ b/pkg/scheduler/framework/session_plugins.go @@ -74,6 +74,11 @@ func (ssn *Session) AddPredicateFn(name string, pf api.PredicateFn) { ssn.predicateFns[name] = pf } +// AddPredicateFn add Predicate function +func (ssn *Session) AddPrePredicateFn(name string, pf api.PrePredicateFn) { + ssn.prePredicateFns[name] = pf +} + // AddBestNodeFn add BestNode function func (ssn *Session) AddBestNodeFn(name string, pf api.BestNodeFn) { ssn.bestNodeFns[name] = pf @@ -639,6 +644,27 @@ func (ssn *Session) PredicateFn(task *api.TaskInfo, node *api.NodeInfo) error { return nil } +// PrePredicateFn invoke predicate function of the plugins +func (ssn *Session) PrePredicateFn(task *api.TaskInfo) error { + for _, tier := range ssn.Tiers { + for _, plugin := range tier.Plugins { + // we use same option as predicates for they are + if !isEnabled(plugin.EnabledPredicate) { + continue + } + pfn, found := ssn.prePredicateFns[plugin.Name] + if !found { + continue + } + err := pfn(task) + if err != nil { + return err + } + } + } + return nil +} + // BestNodeFn invoke bestNode function of the plugins func (ssn *Session) BestNodeFn(task *api.TaskInfo, nodeScores map[float64][]*api.NodeInfo) *api.NodeInfo { for _, tier := range ssn.Tiers { diff --git a/pkg/scheduler/plugins/predicates/predicates.go b/pkg/scheduler/plugins/predicates/predicates.go index 7ee921bbba..6ba0ff1293 100644 --- a/pkg/scheduler/plugins/predicates/predicates.go +++ b/pkg/scheduler/plugins/predicates/predicates.go @@ -346,6 +346,44 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { plugin, _ = podtopologyspread.New(ptsArgs, handle, features) podTopologySpreadFilter := plugin.(*podtopologyspread.PodTopologySpread) + state := k8sframework.NewCycleState() + + ssn.AddPrePredicateFn(pp.Name(), func(task *api.TaskInfo) error { + // Check NodePorts + nodePortFilter.PreFilter(context.TODO(), state, task.Pod) + + // InterPodAffinity Predicate + // TODO: Update the node information to be processed by the filer based on the node list returned by the prefilter. + // In K8S V1.25, the return value result is added to the Prefile interface, + // indicating the list of nodes that meet filtering conditions. + // If the value of result is nil, all nodes meet the conditions. + // If the specified node information exists, only the node information in result meets the conditions. + // The value of Prefile in the current InterPodAffinity package always returns nil. + // The outer layer does not need to be processed temporarily. + // If the filtering logic is added to the Prefile node in the Volumebinding package in the future, + // the processing logic needs to be added to the return value result. + _, status := podAffinityFilter.PreFilter(context.TODO(), state, task.Pod) + if !status.IsSuccess() { + return fmt.Errorf("plugin %s pre-predicates failed %s", interpodaffinity.Name, status.Message()) + } + + // Check PodTopologySpread + // TODO: Update the node information to be processed by the filer based on the node list returned by the prefilter. + // In K8S V1.25, the return value result is added to the Prefile interface, + // indicating the list of nodes that meet filtering conditions. + // If the value of result is nil, all nodes meet the conditions. + // If the specified node information exists, only the node information in result meets the conditions. + // The value of Prefile in the current PodTopologySpread package always returns nil. + // The outer layer does not need to be processed temporarily. + // If the filtering logic is added to the Prefile node in the Volumebinding package in the future, + // the processing logic needs to be added to the return value result. + _, status = podTopologySpreadFilter.PreFilter(context.TODO(), state, task.Pod) + if !status.IsSuccess() { + return fmt.Errorf("plugin %s pre-predicates failed %s", podTopologySpreadFilter.Name(), status.Message()) + } + return nil + }) + ssn.AddPredicateFn(pp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) error { nodeInfo, found := nodeMap[node.Name] if !found { @@ -358,7 +396,6 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { return api.NewFitError(task, node, api.NodePodNumberExceeded) } - state := k8sframework.NewCycleState() predicateByStablefilter := func(pod *v1.Pod, nodeInfo *k8sframework.NodeInfo) (bool, error) { // CheckNodeUnschedulable status := nodeUnscheduleFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) @@ -402,26 +439,9 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { return err } - // Check NodePorts - nodePortFilter.PreFilter(context.TODO(), state, task.Pod) status := nodePortFilter.Filter(context.TODO(), state, nil, nodeInfo) if !status.IsSuccess() { - return fmt.Errorf("plugin %s predicates failed %s", nodeaffinity.Name, status.Message()) - } - - // InterPodAffinity Predicate - // TODO: Update the node information to be processed by the filer based on the node list returned by the prefilter. - // In K8S V1.25, the return value result is added to the Prefile interface, - // indicating the list of nodes that meet filtering conditions. - // If the value of result is nil, all nodes meet the conditions. - // If the specified node information exists, only the node information in result meets the conditions. - // The value of Prefile in the current InterPodAffinity package always returns nil. - // The outer layer does not need to be processed temporarily. - // If the filtering logic is added to the Prefile node in the Volumebinding package in the future, - // the processing logic needs to be added to the return value result. - _, status = podAffinityFilter.PreFilter(context.TODO(), state, task.Pod) - if !status.IsSuccess() { - return fmt.Errorf("plugin %s pre-predicates failed %s", interpodaffinity.Name, status.Message()) + return fmt.Errorf("plugin %s predicates failed %s", nodeports.Name, status.Message()) } status = podAffinityFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) @@ -441,20 +461,6 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { return fmt.Errorf("plugin %s predicates failed %s", volumeZoneFilter.Name(), status.Message()) } - // Check PodTopologySpread - // TODO: Update the node information to be processed by the filer based on the node list returned by the prefilter. - // In K8S V1.25, the return value result is added to the Prefile interface, - // indicating the list of nodes that meet filtering conditions. - // If the value of result is nil, all nodes meet the conditions. - // If the specified node information exists, only the node information in result meets the conditions. - // The value of Prefile in the current PodTopologySpread package always returns nil. - // The outer layer does not need to be processed temporarily. - // If the filtering logic is added to the Prefile node in the Volumebinding package in the future, - // the processing logic needs to be added to the return value result. - _, status = podTopologySpreadFilter.PreFilter(context.TODO(), state, task.Pod) - if !status.IsSuccess() { - return fmt.Errorf("plugin %s pre-predicates failed %s", podTopologySpreadFilter.Name(), status.Message()) - } status = podTopologySpreadFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) if !status.IsSuccess() { return fmt.Errorf("plugin %s predicates failed %s", podTopologySpreadFilter.Name(), status.Message())