Skip to content

Commit

Permalink
Support pause MCP on SNO
Browse files Browse the repository at this point in the history
This commit allow to still pause the MCP when running on OCP.
This is needed when you have only 1 node in the cluster and the drainSkip is true

Signed-off-by: Sebastian Sch <sebassch@gmail.com>
  • Loading branch information
SchSeba committed Dec 7, 2021
1 parent 3846b8b commit 616062f
Showing 1 changed file with 104 additions and 82 deletions.
186 changes: 104 additions & 82 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,11 +497,26 @@ func (dn *Daemon) nodeStateSyncHandler(generation int64) error {
}
}

if reqDrain && !dn.disableDrain {
glog.Info("nodeStateSyncHandler(): drain node")
if err := dn.drainNode(dn.name); err != nil {
if reqDrain {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

glog.Infof("nodeStateSyncHandler(): get drain lock for sriov daemon")
done := make(chan bool)
go dn.getDrainLock(ctx, done)
<-done

glog.Infof("nodeStateSyncHandler(): pause MCP")
if err := dn.pauseMCP(); err != nil {
return err
}

if !dn.disableDrain {
glog.Info("nodeStateSyncHandler(): drain node")
if err := dn.drainNode(); err != nil {
return err
}
}
}

if !reqReboot {
Expand Down Expand Up @@ -551,8 +566,10 @@ func (dn *Daemon) nodeStateSyncHandler(generation int64) error {
}

func (dn *Daemon) completeDrain() error {
if err := drain.RunCordonOrUncordon(dn.drainer, dn.node, false); err != nil {
return err
if !dn.disableDrain {
if err := drain.RunCordonOrUncordon(dn.drainer, dn.node, false); err != nil {
return err
}
}

if utils.ClusterType == utils.ClusterTypeOpenshift {
Expand Down Expand Up @@ -732,7 +749,7 @@ func (dn *Daemon) annotateNode(node, value string) error {
func (dn *Daemon) getNodeMachinePool() error {
desiredConfig, ok := dn.node.Annotations[daemonconsts.DesiredMachineConfigAnnotationKey]
if !ok {
glog.Error("getNodeMachinePool(): Failed to find the the desiredConfig Annotation")
glog.Errorf("getNodeMachinePool(): Failed to find the the desiredConfig Annotation")
return fmt.Errorf("getNodeMachinePool(): Failed to find the the desiredConfig Annotation")
}
mc, err := dn.mcClient.MachineconfigurationV1().MachineConfigs().Get(context.TODO(), desiredConfig, metav1.GetOptions{})
Expand Down Expand Up @@ -801,99 +818,104 @@ func (dn *Daemon) getDrainLock(ctx context.Context, done chan bool) {
})
}

func (dn *Daemon) drainNode(name string) error {
glog.Info("drainNode(): Update prepared")
func (dn *Daemon) pauseMCP() error {
glog.Info("pauseMCP(): check if pausing MCP is possible")
var err error
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

done := make(chan bool)
go dn.getDrainLock(ctx, done)
<-done
if utils.ClusterType != utils.ClusterTypeOpenshift {
glog.Infof("pauseMCP(): skipping MCP pause as the cluster is not an openshift cluster")
return nil
}

if utils.ClusterType == utils.ClusterTypeOpenshift {
mcpInformerFactory := mcfginformers.NewSharedInformerFactory(dn.mcClient,
time.Second*30,
)
mcpInformer := mcpInformerFactory.Machineconfiguration().V1().MachineConfigPools().Informer()
mcpInformerFactory := mcfginformers.NewSharedInformerFactory(dn.mcClient,
time.Second*30,
)
mcpInformer := mcpInformerFactory.Machineconfiguration().V1().MachineConfigPools().Informer()

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
paused := dn.node.Annotations[annoKey] == annoMcpPaused
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
paused := dn.node.Annotations[annoKey] == annoMcpPaused

mcpEventHandler := func(obj interface{}) {
mcp := obj.(*mcfgv1.MachineConfigPool)
if mcp.GetName() != dn.mcpName {
mcpEventHandler := func(obj interface{}) {
mcp := obj.(*mcfgv1.MachineConfigPool)
if mcp.GetName() != dn.mcpName {
return
}
// Always get the latest object
newMcp := &mcfgv1.MachineConfigPool{}
newMcp, err = dn.mcClient.MachineconfigurationV1().MachineConfigPools().Get(ctx, dn.mcpName, metav1.GetOptions{})
if err != nil {
glog.V(2).Infof("pauseMCP(): Failed to get MCP %s: %v", dn.mcpName, err)
return
}
if mcfgv1.IsMachineConfigPoolConditionFalse(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolDegraded) &&
mcfgv1.IsMachineConfigPoolConditionTrue(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolUpdated) &&
mcfgv1.IsMachineConfigPoolConditionFalse(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolUpdating) {
glog.V(2).Infof("pauseMCP(): MCP %s is ready", dn.mcpName)
if paused {
glog.V(2).Info("pauseMCP(): stop MCP informer")
cancel()
return
}
// Always get the latest object
newMcp, err := dn.mcClient.MachineconfigurationV1().MachineConfigPools().Get(ctx, dn.mcpName, metav1.GetOptions{})
if newMcp.Spec.Paused {
glog.V(2).Infof("pauseMCP(): MCP %s was paused by other, wait...", dn.mcpName)
return
}
glog.Infof("pauseMCP(): pause MCP %s", dn.mcpName)
pausePatch := []byte("{\"spec\":{\"paused\":true}}")
_, err = dn.mcClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{})
if err != nil {
glog.V(2).Infof("drainNode(): Failed to get MCP %s: %v", dn.mcpName, err)
glog.V(2).Infof("pauseMCP(): Failed to pause MCP %s: %v", dn.mcpName, err)
return
}
if mcfgv1.IsMachineConfigPoolConditionFalse(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolDegraded) &&
mcfgv1.IsMachineConfigPoolConditionTrue(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolUpdated) &&
mcfgv1.IsMachineConfigPoolConditionFalse(newMcp.Status.Conditions, mcfgv1.MachineConfigPoolUpdating) {
glog.V(2).Infof("drainNode(): MCP %s is ready", dn.mcpName)
if paused {
glog.V(2).Info("drainNode(): stop MCP informer")
cancel()
return
}
if newMcp.Spec.Paused {
glog.V(2).Infof("drainNode(): MCP %s was paused by other, wait...", dn.mcpName)
return
}
glog.Infof("drainNode(): pause MCP %s", dn.mcpName)
pausePatch := []byte("{\"spec\":{\"paused\":true}}")
_, err = dn.mcClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{})
if err != nil {
glog.V(2).Infof("drainNode(): Failed to pause MCP %s: %v", dn.mcpName, err)
return
}
err = dn.annotateNode(dn.name, annoMcpPaused)
if err != nil {
glog.V(2).Infof("drainNode(): Failed to annotate node: %v", err)
return
}
paused = true
err = dn.annotateNode(dn.name, annoMcpPaused)
if err != nil {
glog.V(2).Infof("pauseMCP(): Failed to annotate node: %v", err)
return
}
if paused {
glog.Infof("drainNode(): MCP is processing, resume MCP %s", dn.mcpName)
pausePatch := []byte("{\"spec\":{\"paused\":false}}")
_, err = dn.mcClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{})
if err != nil {
glog.V(2).Infof("drainNode(): fail to resume MCP %s: %v", dn.mcpName, err)
return
}
err = dn.annotateNode(dn.name, annoDraining)
if err != nil {
glog.V(2).Infof("drainNode(): Failed to annotate node: %v", err)
return
}
paused = false
paused = true
return
}
if paused {
glog.Infof("pauseMCP(): MCP is processing, resume MCP %s", dn.mcpName)
pausePatch := []byte("{\"spec\":{\"paused\":false}}")
_, err = dn.mcClient.MachineconfigurationV1().MachineConfigPools().Patch(context.Background(), dn.mcpName, types.MergePatchType, pausePatch, metav1.PatchOptions{})
if err != nil {
glog.V(2).Infof("pauseMCP(): fail to resume MCP %s: %v", dn.mcpName, err)
return
}
glog.Infof("drainNode():MCP %s is not ready: %v, wait...", newMcp.GetName(), newMcp.Status.Conditions)
err = dn.annotateNode(dn.name, annoDraining)
if err != nil {
glog.V(2).Infof("pauseMCP(): Failed to annotate node: %v", err)
return
}
paused = false
}
glog.Infof("pauseMCP():MCP %s is not ready: %v, wait...", newMcp.GetName(), newMcp.Status.Conditions)
}

mcpInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: mcpEventHandler,
UpdateFunc: func(old, new interface{}) {
mcpEventHandler(new)
},
})

// The Draining_MCP_Paused state means the MCP work has been paused by the config daemon in previous round.
// Only check MCP state if the node is not in Draining_MCP_Paused state
if !paused {
mcpInformerFactory.Start(ctx.Done())
mcpInformerFactory.WaitForCacheSync(ctx.Done())
<-ctx.Done()
}
mcpInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: mcpEventHandler,
UpdateFunc: func(old, new interface{}) {
mcpEventHandler(new)
},
})

// The Draining_MCP_Paused state means the MCP work has been paused by the config daemon in previous round.
// Only check MCP state if the node is not in Draining_MCP_Paused state
if !paused {
mcpInformerFactory.Start(ctx.Done())
mcpInformerFactory.WaitForCacheSync(ctx.Done())
<-ctx.Done()
}

return err
}

func (dn *Daemon) drainNode() error {
glog.Info("drainNode(): Update prepared")
var err error

backoff := wait.Backoff{
Steps: 5,
Duration: 10 * time.Second,
Expand Down

0 comments on commit 616062f

Please sign in to comment.