diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index 6a9f5c1aa..138b3fb6a 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -497,11 +497,26 @@ func (dn *Daemon) nodeStateSyncHandler(generation int64) error { } } - if reqDrain && !dn.disableDrain { - glog.Info("nodeStateSyncHandler(): drain node") - if err := dn.drainNode(dn.name); err != nil { + if reqDrain { + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + glog.Infof("nodeStateSyncHandler(): get drain lock for sriov daemon") + done := make(chan bool) + go dn.getDrainLock(ctx, done) + <-done + + glog.Infof("nodeStateSyncHandler(): pause MCP") + if err := dn.pauseMCP(); err != nil { return err } + + if !dn.disableDrain { + glog.Info("nodeStateSyncHandler(): drain node") + if err := dn.drainNode(); err != nil { + return err + } + } } if !reqReboot { @@ -551,8 +566,10 @@ func (dn *Daemon) nodeStateSyncHandler(generation int64) error { } func (dn *Daemon) completeDrain() error { - if err := drain.RunCordonOrUncordon(dn.drainer, dn.node, false); err != nil { - return err + if !dn.disableDrain { + if err := drain.RunCordonOrUncordon(dn.drainer, dn.node, false); err != nil { + return err + } } if utils.ClusterType == utils.ClusterTypeOpenshift { @@ -732,7 +749,7 @@ func (dn *Daemon) annotateNode(node, value string) error { func (dn *Daemon) getNodeMachinePool() error { desiredConfig, ok := dn.node.Annotations[daemonconsts.DesiredMachineConfigAnnotationKey] if !ok { - glog.Error("getNodeMachinePool(): Failed to find the the desiredConfig Annotation") + glog.Errorf("getNodeMachinePool(): Failed to find the the desiredConfig Annotation") return fmt.Errorf("getNodeMachinePool(): Failed to find the the desiredConfig Annotation") } mc, err := dn.mcClient.MachineconfigurationV1().MachineConfigs().Get(context.TODO(), desiredConfig, metav1.GetOptions{}) @@ -801,99 +818,104 @@ func (dn *Daemon) getDrainLock(ctx context.Context, done chan bool) { }) } -func (dn *Daemon) drainNode(name string) error { - glog.Info("drainNode(): Update prepared") +func (dn *Daemon) pauseMCP() error { + glog.Info("pauseMCP(): check if pausing MCP is possible") var err error - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() - done := make(chan bool) - go dn.getDrainLock(ctx, done) - <-done + if utils.ClusterType != utils.ClusterTypeOpenshift { + glog.Infof("pauseMCP(): skipping MCP pause as the cluster is not an openshift cluster") + return nil + } - if utils.ClusterType == utils.ClusterTypeOpenshift { - mcpInformerFactory := mcfginformers.NewSharedInformerFactory(dn.mcClient, - time.Second*30, - ) - mcpInformer := mcpInformerFactory.Machineconfiguration().V1().MachineConfigPools().Informer() + mcpInformerFactory := mcfginformers.NewSharedInformerFactory(dn.mcClient, + time.Second*30, + ) + mcpInformer := mcpInformerFactory.Machineconfiguration().V1().MachineConfigPools().Informer() - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() - paused := dn.node.Annotations[annoKey] == annoMcpPaused + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + paused := dn.node.Annotations[annoKey] == annoMcpPaused - mcpEventHandler := func(obj interface{}) { - mcp := obj.(*mcfgv1.MachineConfigPool) - if mcp.GetName() != dn.mcpName { + mcpEventHandler := func(obj interface{}) { + mcp := obj.(*mcfgv1.MachineConfigPool) + if mcp.GetName() != dn.mcpName { + return + } + // Always get the latest object + newMcp := &mcfgv1.MachineConfigPool{} + newMcp, err = dn.mcClient.MachineconfigurationV1().MachineConfigPools().Get(ctx, dn.mcpName, metav1.GetOptions{}) + if err != nil { + glog.V(2).Infof("pauseMCP(): Failed to get MCP %s: %v", dn.mcpName, err) + return + } + if mcfgv1.IsMachineConfigPoolConditionFalse(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolDegraded) && + mcfgv1.IsMachineConfigPoolConditionTrue(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolUpdated) && + mcfgv1.IsMachineConfigPoolConditionFalse(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolUpdating) { + glog.V(2).Infof("pauseMCP(): MCP %s is ready", dn.mcpName) + if paused { + glog.V(2).Info("pauseMCP(): stop MCP informer") + cancel() return } - // Always get the latest object - newMcp, err := dn.mcClient.MachineconfigurationV1().MachineConfigPools().Get(ctx, dn.mcpName, metav1.GetOptions{}) + if newMcp.Spec.Paused { + glog.V(2).Infof("pauseMCP(): MCP %s was paused by other, wait...", dn.mcpName) + return + } + glog.Infof("pauseMCP(): pause MCP %s", dn.mcpName) + pausePatch := []byte("{\"spec\":{\"paused\":true}}") + _, err = dn.mcClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{}) if err != nil { - glog.V(2).Infof("drainNode(): Failed to get MCP %s: %v", dn.mcpName, err) + glog.V(2).Infof("pauseMCP(): Failed to pause MCP %s: %v", dn.mcpName, err) return } - if mcfgv1.IsMachineConfigPoolConditionFalse(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolDegraded) && - mcfgv1.IsMachineConfigPoolConditionTrue(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolUpdated) && - mcfgv1.IsMachineConfigPoolConditionFalse(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolUpdating) { - glog.V(2).Infof("drainNode(): MCP %s is ready", dn.mcpName) - if paused { - glog.V(2).Info("drainNode(): stop MCP informer") - cancel() - return - } - if newMcp.Spec.Paused { - glog.V(2).Infof("drainNode(): MCP %s was paused by other, wait...", dn.mcpName) - return - } - glog.Infof("drainNode(): pause MCP %s", dn.mcpName) - pausePatch := []byte("{\"spec\":{\"paused\":true}}") - _, err = dn.mcClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{}) - if err != nil { - glog.V(2).Infof("drainNode(): Failed to pause MCP %s: %v", dn.mcpName, err) - return - } - err = dn.annotateNode(dn.name, annoMcpPaused) - if err != nil { - glog.V(2).Infof("drainNode(): Failed to annotate node: %v", err) - return - } - paused = true + err = dn.annotateNode(dn.name, annoMcpPaused) + if err != nil { + glog.V(2).Infof("pauseMCP(): Failed to annotate node: %v", err) return } - if paused { - glog.Infof("drainNode(): MCP is processing, resume MCP %s", dn.mcpName) - pausePatch := []byte("{\"spec\":{\"paused\":false}}") - _, err = dn.mcClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{}) - if err != nil { - glog.V(2).Infof("drainNode(): fail to resume MCP %s: %v", dn.mcpName, err) - return - } - err = dn.annotateNode(dn.name, annoDraining) - if err != nil { - glog.V(2).Infof("drainNode(): Failed to annotate node: %v", err) - return - } - paused = false + paused = true + return + } + if paused { + glog.Infof("pauseMCP(): MCP is processing, resume MCP %s", dn.mcpName) + pausePatch := []byte("{\"spec\":{\"paused\":false}}") + _, err = dn.mcClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{}) + if err != nil { + glog.V(2).Infof("pauseMCP(): fail to resume MCP %s: %v", dn.mcpName, err) + return } - glog.Infof("drainNode():MCP %s is not ready: %v, wait...", newMcp.GetName(), newMcp.Status.Conditions) + err = dn.annotateNode(dn.name, annoDraining) + if err != nil { + glog.V(2).Infof("pauseMCP(): Failed to annotate node: %v", err) + return + } + paused = false } + glog.Infof("pauseMCP():MCP %s is not ready: %v, wait...", newMcp.GetName(), newMcp.Status.Conditions) + } - mcpInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: mcpEventHandler, - UpdateFunc: func(old, new interface{}) { - mcpEventHandler(new) - }, - }) - - // The Draining_MCP_Paused state means the MCP work has been paused by the config daemon in previous round. - // Only check MCP state if the node is not in Draining_MCP_Paused state - if !paused { - mcpInformerFactory.Start(ctx.Done()) - mcpInformerFactory.WaitForCacheSync(ctx.Done()) - <-ctx.Done() - } + mcpInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: mcpEventHandler, + UpdateFunc: func(old, new interface{}) { + mcpEventHandler(new) + }, + }) + + // The Draining_MCP_Paused state means the MCP work has been paused by the config daemon in previous round. + // Only check MCP state if the node is not in Draining_MCP_Paused state + if !paused { + mcpInformerFactory.Start(ctx.Done()) + mcpInformerFactory.WaitForCacheSync(ctx.Done()) + <-ctx.Done() } + return err +} + +func (dn *Daemon) drainNode() error { + glog.Info("drainNode(): Update prepared") + var err error + backoff := wait.Backoff{ Steps: 5, Duration: 10 * time.Second,