From a2c5a5c049210459a7a2451952eeb3c42dc516f4 Mon Sep 17 00:00:00 2001 From: gj199575 <409237405@qq.com> Date: Mon, 22 May 2023 21:34:19 +0800 Subject: [PATCH] fix synchronous pvc/pv bind error when volcano schedule pod with pvc Signed-off-by: gj199575 <409237405@qq.com> --- pkg/scheduler/cache/cache.go | 53 +++++++++++++++++------------------- 1 file changed, 25 insertions(+), 28 deletions(-) diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 7be86e22d62..c459ecbb342 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -138,7 +138,6 @@ type SchedulerCache struct { vcInformerFactory vcinformer.SharedInformerFactory BindFlowChannel chan *schedulingapi.TaskInfo - bindCache []*schedulingapi.TaskInfo batchNum int // A map from image name to its imageState. @@ -768,24 +767,22 @@ func (sc *SchedulerCache) Evict(taskInfo *schedulingapi.TaskInfo, reason string) } // Bind binds task to the target host. -func (sc *SchedulerCache) Bind(tasks []*schedulingapi.TaskInfo) error { - go func(taskArray []*schedulingapi.TaskInfo) { - tmp := time.Now() - errTasks, err := sc.Binder.Bind(sc.kubeClient, taskArray) - if err == nil { - klog.V(3).Infof("bind ok, latency %v", time.Since(tmp)) - for _, task := range tasks { - sc.Recorder.Eventf(task.Pod, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v/%v to %v", - task.Namespace, task.Name, task.NodeName) - } - } else { - for _, task := range errTasks { - klog.V(2).Infof("resyncTask task %s", task.Name) - sc.VolumeBinder.RevertVolumes(task, task.PodVolumes) - sc.resyncTask(task) - } +func (sc *SchedulerCache) Bind(taskArray []*schedulingapi.TaskInfo) error { + tmp := time.Now() + errTasks, err := sc.Binder.Bind(sc.kubeClient, taskArray) + if err == nil { + klog.V(3).Infof("bind ok, latency %v", time.Since(tmp)) + for _, task := range taskArray { + sc.Recorder.Eventf(task.Pod, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v/%v to %v", + task.Namespace, task.Name, task.NodeName) + } + } else { + for _, task := range errTasks { + klog.V(2).Infof("resyncTask task %s", task.Name) + sc.VolumeBinder.RevertVolumes(task, task.PodVolumes) + sc.resyncTask(task) } - }(tasks) + } return nil } @@ -988,6 +985,7 @@ func (sc *SchedulerCache) AddBindTask(taskInfo *schedulingapi.TaskInfo) error { } func (sc *SchedulerCache) processBindTask() { + var bindCache []*schedulingapi.TaskInfo = make([]*schedulingapi.TaskInfo, 10) for { select { case taskInfo, ok := <-sc.BindFlowChannel: @@ -995,9 +993,9 @@ func (sc *SchedulerCache) processBindTask() { return } - sc.bindCache = append(sc.bindCache, taskInfo) - if len(sc.bindCache) == sc.batchNum { - sc.BindTask() + bindCache = append(bindCache, taskInfo) + if len(bindCache) == sc.batchNum { + go sc.BindTask(bindCache) } default: } @@ -1007,17 +1005,18 @@ func (sc *SchedulerCache) processBindTask() { } } - if len(sc.bindCache) == 0 { + if len(bindCache) == 0 { return } - sc.BindTask() + go sc.BindTask(bindCache) } -func (sc *SchedulerCache) BindTask() { - klog.V(5).Infof("batch bind task count %d", len(sc.bindCache)) +func (sc *SchedulerCache) BindTask(bindCache []*schedulingapi.TaskInfo) { + klog.V(5).Infof("batch bind task count %d", len(bindCache)) + successfulTasks := make([]*schedulingapi.TaskInfo, 0) - for _, task := range sc.bindCache { + for _, task := range bindCache { if err := sc.VolumeBinder.BindVolumes(task, task.PodVolumes); err != nil { klog.Errorf("task %s/%s bind Volumes failed: %#v", task.Namespace, task.Name, err) sc.VolumeBinder.RevertVolumes(task, task.PodVolumes) @@ -1038,8 +1037,6 @@ func (sc *SchedulerCache) BindTask() { for _, task := range successfulTasks { metrics.UpdateTaskScheduleDuration(metrics.Duration(task.Pod.CreationTimestamp.Time)) } - - sc.bindCache = sc.bindCache[0:0] } // Snapshot returns the complete snapshot of the cluster from cache