Skip to content

Commit

Permalink
Fix comments and rebase
Browse files Browse the repository at this point in the history
Signed-off-by: Sebastian Sch <sebassch@gmail.com>
  • Loading branch information
SchSeba committed Feb 28, 2024
1 parent 2b8f076 commit 05a6158
Show file tree
Hide file tree
Showing 17 changed files with 265 additions and 239 deletions.
135 changes: 52 additions & 83 deletions controllers/drain_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package controllers
import (
"context"
"fmt"
"os"
"sync"
"time"

Expand Down Expand Up @@ -50,23 +49,15 @@ import (

type DrainReconcile struct {
client.Client
Scheme *runtime.Scheme
recorder record.EventRecorder
drainer drain.DrainInterface
resourcePrefix string

nodesInReconcile map[string]interface{}
nodesInReconcileMutex sync.Mutex
drainCheckMutex sync.Mutex
Scheme *runtime.Scheme
recorder record.EventRecorder
drainer drain.DrainInterface

drainCheckMutex sync.Mutex
}

func NewDrainReconcileController(client client.Client, Scheme *runtime.Scheme, recorder record.EventRecorder, platformHelper platforms.Interface) (*DrainReconcile, error) {
resourcePrefix := os.Getenv("RESOURCE_PREFIX")
if resourcePrefix == "" {
return nil, fmt.Errorf("RESOURCE_PREFIX environment variable can't be empty")
}

drainer, err := drain.NewDrainer(resourcePrefix, platformHelper)
drainer, err := drain.NewDrainer(platformHelper)
if err != nil {
return nil, err
}
Expand All @@ -76,30 +67,8 @@ func NewDrainReconcileController(client client.Client, Scheme *runtime.Scheme, r
Scheme,
recorder,
drainer,
resourcePrefix,
map[string]interface{}{},
sync.Mutex{},
sync.Mutex{}}, nil
}
func (dr *DrainReconcile) TryLockNode(nodeName string) bool {
dr.nodesInReconcileMutex.Lock()
defer dr.nodesInReconcileMutex.Unlock()

_, exist := dr.nodesInReconcile[nodeName]
if exist {
return false
}

dr.nodesInReconcile[nodeName] = nil
return true
}

func (dr *DrainReconcile) unlockNode(nodeName string) {
dr.nodesInReconcileMutex.Lock()
defer dr.nodesInReconcileMutex.Unlock()

delete(dr.nodesInReconcile, nodeName)
}

//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch;update;patch
//+kubebuilder:rbac:groups=sriovnetwork.openshift.io,resources=sriovnodestates,verbs=get;list;watch
Expand All @@ -110,74 +79,41 @@ func (dr *DrainReconcile) unlockNode(nodeName string) {
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.8.3/pkg/reconcile
func (dr *DrainReconcile) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// try to lock the node to this reconcile loop.
// if we are not able this means there is another loop already handling this node, so we just exist
if !dr.TryLockNode(req.Name) {
return ctrl.Result{}, nil
}

// configure logs
reqLogger := log.FromContext(ctx)
reqLogger.Info("Reconciling Drain")
reqLogger.V(2).Info("node locked for drain controller", "nodeName", req.Name)

// we send to another function so the operator can have a defer function to release the lock
return dr.reconcile(ctx, req)
}

func (dr *DrainReconcile) reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// remove the lock when we exist the function
defer dr.unlockNode(req.Name)

req.Namespace = vars.Namespace

// configure logs
reqLogger := log.FromContext(ctx)

// get node object
node := &corev1.Node{}
err := dr.Get(ctx, req.NamespacedName, node)
err := dr.getObject(ctx, req, node)
if err != nil {
if errors.IsNotFound(err) {
reqLogger.Error(err, "node object doesn't exist", "nodeName", req.Name)
return ctrl.Result{}, nil
}

reqLogger.Error(err, "failed to get node object from api re-queue the request", "nodeName", req.Name)
reqLogger.Error(err, "failed to get node object")
return ctrl.Result{}, err
}

// get sriovNodeNodeState object
nodeNetworkState := &sriovnetworkv1.SriovNetworkNodeState{}
err = dr.Get(ctx, req.NamespacedName, nodeNetworkState)
err = dr.getObject(ctx, req, nodeNetworkState)
if err != nil {
if errors.IsNotFound(err) {
reqLogger.Error(err, "sriovNetworkNodeState object doesn't exist", "nodeName", req.Name)
return ctrl.Result{}, nil
}

reqLogger.Error(err, "failed to get sriovNetworkNodeState object from api re-queue the request", "nodeName", req.Name)
reqLogger.Error(err, "failed to get sriovNetworkNodeState object")
return ctrl.Result{}, err
}

// create the drain state annotation if it doesn't exist in the sriovNetworkNodeState object
nodeStateDrainAnnotationCurrent, NodeStateDrainAnnotationCurrentExist := nodeNetworkState.Annotations[constants.NodeStateDrainAnnotationCurrent]
if !NodeStateDrainAnnotationCurrentExist {
err = utils.AnnotateObject(nodeNetworkState, constants.NodeStateDrainAnnotationCurrent, constants.DrainIdle, dr.Client)
if err != nil {
return ctrl.Result{}, err
}
nodeStateDrainAnnotationCurrent = constants.DrainIdle
nodeStateDrainAnnotationCurrent, err := dr.ensureAnnotationExists(nodeNetworkState, constants.NodeStateDrainAnnotationCurrent)
if err != nil {
reqLogger.Error(err, "failed to ensure nodeStateDrainAnnotation")
return ctrl.Result{}, err
}

// create the drain state annotation if it doesn't exist in the node object
nodeDrainAnnotation, nodeDrainAnnotationExist := node.Annotations[constants.NodeDrainAnnotation]
if !nodeDrainAnnotationExist {
err = utils.AnnotateObject(node, constants.NodeDrainAnnotation, constants.DrainIdle, dr.Client)
if err != nil {
return ctrl.Result{}, err
}
nodeDrainAnnotation = constants.DrainIdle
nodeDrainAnnotation, err := dr.ensureAnnotationExists(node, constants.NodeDrainAnnotation)
if err != nil {
reqLogger.Error(err, "failed to ensure nodeStateDrainAnnotation")
return ctrl.Result{}, err
}

reqLogger.V(2).Info("Drain annotations", "nodeAnnotation", nodeDrainAnnotation, "nodeStateAnnotation", nodeStateDrainAnnotationCurrent)
Expand Down Expand Up @@ -218,6 +154,7 @@ func (dr *DrainReconcile) reconcile(ctx context.Context, req ctrl.Request) (ctrl
corev1.EventTypeWarning,
"DrainController",
"node complete drain was not completed")
// TODO: make this time configurable
return reconcile.Result{RequeueAfter: 5 * time.Second}, nil
}

Expand All @@ -235,7 +172,7 @@ func (dr *DrainReconcile) reconcile(ctx context.Context, req ctrl.Request) (ctrl
"node un drain completed")
return ctrl.Result{}, nil
}
} else {
} else if nodeDrainAnnotation == constants.DrainRequired || nodeDrainAnnotation == constants.RebootRequired {
// this cover the case a node request to drain or reboot

// nothing to do here we need to wait for the node to move back to idle
Expand Down Expand Up @@ -298,6 +235,38 @@ func (dr *DrainReconcile) reconcile(ctx context.Context, req ctrl.Request) (ctrl
return reconcile.Result{}, fmt.Errorf("unexpected node drain annotation")
}

func (dr *DrainReconcile) getObject(ctx context.Context, req ctrl.Request, object client.Object) error {
// configure logs
reqLogger := log.FromContext(ctx)
reqLogger.Info("checkForNodeDrain():")

err := dr.Get(ctx, req.NamespacedName, object)
if err != nil {
if errors.IsNotFound(err) {
reqLogger.Error(err, "object doesn't exist", "objectName", req.Name)
return nil
}

reqLogger.Error(err, "failed to get object from api re-queue the request", "objectName", req.Name)
return err
}

return nil
}

func (dr *DrainReconcile) ensureAnnotationExists(object client.Object, key string) (string, error) {
value, exist := object.GetAnnotations()[key]
if !exist {
err := utils.AnnotateObject(object, constants.NodeStateDrainAnnotationCurrent, constants.DrainIdle, dr.Client)
if err != nil {
return "", err
}
return constants.DrainIdle, nil
}

return value, nil
}

func (dr *DrainReconcile) checkForNodeDrain(ctx context.Context, node *corev1.Node) (*reconcile.Result, error) {
// configure logs
reqLogger := log.FromContext(ctx)
Expand Down
Loading

0 comments on commit 05a6158

Please sign in to comment.