Skip to content

Commit

Permalink
fix synchronous pvc/pv bind error when volcano schedule pod with pvc
Browse files Browse the repository at this point in the history
Signed-off-by: gj199575 <409237405@qq.com>
  • Loading branch information
gj409237405 committed May 23, 2023
1 parent 4224c9c commit a2c5a5c
Showing 1 changed file with 25 additions and 28 deletions.
53 changes: 25 additions & 28 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ type SchedulerCache struct {
vcInformerFactory vcinformer.SharedInformerFactory

BindFlowChannel chan *schedulingapi.TaskInfo
bindCache []*schedulingapi.TaskInfo
batchNum int

// A map from image name to its imageState.
Expand Down Expand Up @@ -768,24 +767,22 @@ func (sc *SchedulerCache) Evict(taskInfo *schedulingapi.TaskInfo, reason string)
}

// Bind binds task to the target host.
func (sc *SchedulerCache) Bind(tasks []*schedulingapi.TaskInfo) error {
go func(taskArray []*schedulingapi.TaskInfo) {
tmp := time.Now()
errTasks, err := sc.Binder.Bind(sc.kubeClient, taskArray)
if err == nil {
klog.V(3).Infof("bind ok, latency %v", time.Since(tmp))
for _, task := range tasks {
sc.Recorder.Eventf(task.Pod, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v/%v to %v",
task.Namespace, task.Name, task.NodeName)
}
} else {
for _, task := range errTasks {
klog.V(2).Infof("resyncTask task %s", task.Name)
sc.VolumeBinder.RevertVolumes(task, task.PodVolumes)
sc.resyncTask(task)
}
func (sc *SchedulerCache) Bind(taskArray []*schedulingapi.TaskInfo) error {
tmp := time.Now()
errTasks, err := sc.Binder.Bind(sc.kubeClient, taskArray)
if err == nil {
klog.V(3).Infof("bind ok, latency %v", time.Since(tmp))
for _, task := range taskArray {
sc.Recorder.Eventf(task.Pod, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v/%v to %v",
task.Namespace, task.Name, task.NodeName)
}
} else {
for _, task := range errTasks {
klog.V(2).Infof("resyncTask task %s", task.Name)
sc.VolumeBinder.RevertVolumes(task, task.PodVolumes)
sc.resyncTask(task)
}
}(tasks)
}
return nil
}

Expand Down Expand Up @@ -988,16 +985,17 @@ func (sc *SchedulerCache) AddBindTask(taskInfo *schedulingapi.TaskInfo) error {
}

func (sc *SchedulerCache) processBindTask() {
var bindCache []*schedulingapi.TaskInfo = make([]*schedulingapi.TaskInfo, 10)
for {
select {
case taskInfo, ok := <-sc.BindFlowChannel:
if !ok {
return
}

sc.bindCache = append(sc.bindCache, taskInfo)
if len(sc.bindCache) == sc.batchNum {
sc.BindTask()
bindCache = append(bindCache, taskInfo)
if len(bindCache) == sc.batchNum {
go sc.BindTask(bindCache)
}
default:
}
Expand All @@ -1007,17 +1005,18 @@ func (sc *SchedulerCache) processBindTask() {
}
}

if len(sc.bindCache) == 0 {
if len(bindCache) == 0 {
return
}

sc.BindTask()
go sc.BindTask(bindCache)
}

func (sc *SchedulerCache) BindTask() {
klog.V(5).Infof("batch bind task count %d", len(sc.bindCache))
func (sc *SchedulerCache) BindTask(bindCache []*schedulingapi.TaskInfo) {
klog.V(5).Infof("batch bind task count %d", len(bindCache))

successfulTasks := make([]*schedulingapi.TaskInfo, 0)
for _, task := range sc.bindCache {
for _, task := range bindCache {
if err := sc.VolumeBinder.BindVolumes(task, task.PodVolumes); err != nil {
klog.Errorf("task %s/%s bind Volumes failed: %#v", task.Namespace, task.Name, err)
sc.VolumeBinder.RevertVolumes(task, task.PodVolumes)
Expand All @@ -1038,8 +1037,6 @@ func (sc *SchedulerCache) BindTask() {
for _, task := range successfulTasks {
metrics.UpdateTaskScheduleDuration(metrics.Duration(task.Pod.CreationTimestamp.Time))
}

sc.bindCache = sc.bindCache[0:0]
}

// Snapshot returns the complete snapshot of the cluster from cache
Expand Down

0 comments on commit a2c5a5c

Please sign in to comment.