Skip to content

Commit

Permalink
Add mountpod filter in mount manager predicates (#1047)
Browse files Browse the repository at this point in the history
* improve error logging for mountpod fuse connection
* fix: add mountpod filter in mount manager predicates

Signed-off-by: Xuhui zhang <xuhui@juicedata.io>
  • Loading branch information
zxh326 committed Jul 24, 2024
1 parent 3e81a8d commit 5acf5bd
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 6 deletions.
9 changes: 9 additions & 0 deletions pkg/controller/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,17 @@ func (m *PodController) SetupWithManager(mgr ctrl.Manager) error {
return c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForObject{}, predicate.Funcs{
CreateFunc: func(event event.CreateEvent) bool {
pod := event.Object.(*corev1.Pod)
if pod.Spec.NodeName != config.NodeName && pod.Spec.NodeSelector["kubernetes.io/hostname"] != config.NodeName {
return false
}
klog.V(6).Infof("watch pod %s created", pod.GetName())
return true
},
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
podNew, ok := updateEvent.ObjectNew.(*corev1.Pod)
if podNew.Spec.NodeName != config.NodeName && podNew.Spec.NodeSelector["kubernetes.io/hostname"] != config.NodeName {
return false
}
klog.V(6).Infof("watch pod %s updated", podNew.GetName())
if !ok {
klog.V(6).Infof("pod.onUpdateFunc Skip object: %v", updateEvent.ObjectNew)
Expand All @@ -155,6 +161,9 @@ func (m *PodController) SetupWithManager(mgr ctrl.Manager) error {
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
pod := deleteEvent.Object.(*corev1.Pod)
if pod.Spec.NodeName != config.NodeName && pod.Spec.NodeSelector["kubernetes.io/hostname"] != config.NodeName {
return false
}
klog.V(6).Infof("watch pod %s deleted", pod.GetName())
return true
},
Expand Down
12 changes: 6 additions & 6 deletions pkg/controller/pod_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,10 +763,10 @@ func (p *PodDriver) checkMountPodStuck(pod *corev1.Pod) {
if runtime.GOOS == "linux" {
if devMinor, ok := util.DevMinorTableLoad(mountPoint); ok {
if err := p.doAbortFuse(pod, devMinor); err != nil {
klog.Errorf("abort fuse connection error: %v", err)
klog.Errorf("mountpod: %s, abort fuse connection error: %v", pod.Name, err)
}
} else {
klog.Errorf("can't find devMinor of mountPoint %s", mountPoint)
klog.Errorf("mountpod: %s, can't find devMinor of mountPoint %s", pod.Name, mountPoint)
}
}
return
Expand All @@ -783,7 +783,7 @@ func (p *PodDriver) checkMountPodStuck(pod *corev1.Pod) {
func (p *PodDriver) doAbortFuse(mountpod *corev1.Pod, devMinor uint32) error {
job := builder.NewFuseAbortJob(mountpod, devMinor)
if _, err := p.Client.CreateJob(context.Background(), job); err != nil {
klog.Errorf("create fuse abort job error: %v", err)
klog.Errorf("mountpod: %s, create fuse abort job error: %v", mountpod.Name, err)
return err
}

Expand All @@ -792,20 +792,20 @@ func (p *PodDriver) doAbortFuse(mountpod *corev1.Pod, devMinor uint32) error {
defer cancel()
for {
if waitCtx.Err() == context.Canceled || waitCtx.Err() == context.DeadlineExceeded {
klog.Errorf("fuse abort job %s/%s timeout", job.Namespace, job.Name)
klog.Errorf("mountpod: %s, fuse abort job %s/%s timeout", mountpod.Name, job.Namespace, job.Name)
break
}
job, err := p.Client.GetJob(waitCtx, job.Name, job.Namespace)
if apierrors.IsNotFound(err) {
break
}
if err != nil {
klog.Errorf("get fuse abort job %s/%s error: %v", err, job.Namespace, job.Name)
klog.Errorf("mountpod: %s, get fuse abort job %s/%s error: %v", mountpod.Name, err, job.Namespace, job.Name)
time.Sleep(10 * time.Second)
continue
}
if resource.IsJobCompleted(job) {
klog.V(5).Infof("fuse abort job %s/%s completed", job.Namespace, job.Name)
klog.V(5).Infof("mountpod: %s, fuse abort job %s/%s completed", mountpod.Name, job.Namespace, job.Name)
break
}
time.Sleep(10 * time.Second)
Expand Down

0 comments on commit 5acf5bd

Please sign in to comment.