Skip to content

Commit

Permalink
Fix comments and rebase
Browse files Browse the repository at this point in the history
Signed-off-by: Sebastian Sch <sebassch@gmail.com>
  • Loading branch information
SchSeba committed Mar 7, 2024
1 parent fb36c70 commit 1681015
Show file tree
Hide file tree
Showing 18 changed files with 288 additions and 328 deletions.
167 changes: 65 additions & 102 deletions controllers/drain_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package controllers
import (
"context"
"fmt"
"os"
"sync"
"time"

Expand All @@ -29,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -48,25 +48,24 @@ import (
"github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/vars"
)

var (
oneNode = intstr.FromInt32(1)
defaultNpcl = &sriovnetworkv1.SriovNetworkPoolConfig{Spec: sriovnetworkv1.SriovNetworkPoolConfigSpec{
MaxUnavailable: &oneNode,
NodeSelector: &metav1.LabelSelector{}}}
)

type DrainReconcile struct {
client.Client
Scheme *runtime.Scheme
recorder record.EventRecorder
drainer drain.DrainInterface
resourcePrefix string

nodesInReconcile map[string]interface{}
nodesInReconcileMutex sync.Mutex
drainCheckMutex sync.Mutex
Scheme *runtime.Scheme
recorder record.EventRecorder
drainer drain.DrainInterface

drainCheckMutex sync.Mutex
}

func NewDrainReconcileController(client client.Client, Scheme *runtime.Scheme, recorder record.EventRecorder, platformHelper platforms.Interface) (*DrainReconcile, error) {
resourcePrefix := os.Getenv("RESOURCE_PREFIX")
if resourcePrefix == "" {
return nil, fmt.Errorf("RESOURCE_PREFIX environment variable can't be empty")
}

drainer, err := drain.NewDrainer(resourcePrefix, platformHelper)
drainer, err := drain.NewDrainer(platformHelper)
if err != nil {
return nil, err
}
Expand All @@ -76,30 +75,8 @@ func NewDrainReconcileController(client client.Client, Scheme *runtime.Scheme, r
Scheme,
recorder,
drainer,
resourcePrefix,
map[string]interface{}{},
sync.Mutex{},
sync.Mutex{}}, nil
}
func (dr *DrainReconcile) TryLockNode(nodeName string) bool {
dr.nodesInReconcileMutex.Lock()
defer dr.nodesInReconcileMutex.Unlock()

_, exist := dr.nodesInReconcile[nodeName]
if exist {
return false
}

dr.nodesInReconcile[nodeName] = nil
return true
}

func (dr *DrainReconcile) unlockNode(nodeName string) {
dr.nodesInReconcileMutex.Lock()
defer dr.nodesInReconcileMutex.Unlock()

delete(dr.nodesInReconcile, nodeName)
}

//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch;update;patch
//+kubebuilder:rbac:groups=sriovnetwork.openshift.io,resources=sriovnodestates,verbs=get;list;watch
Expand All @@ -110,74 +87,39 @@ func (dr *DrainReconcile) unlockNode(nodeName string) {
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.8.3/pkg/reconcile
func (dr *DrainReconcile) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// try to lock the node to this reconcile loop.
// if we are not able this means there is another loop already handling this node, so we just exist
if !dr.TryLockNode(req.Name) {
return ctrl.Result{}, nil
}

// configure logs
reqLogger := log.FromContext(ctx)
reqLogger.Info("Reconciling Drain")
reqLogger.V(2).Info("node locked for drain controller", "nodeName", req.Name)

// we send to another function so the operator can have a defer function to release the lock
return dr.reconcile(ctx, req)
}

func (dr *DrainReconcile) reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// remove the lock when we exist the function
defer dr.unlockNode(req.Name)

req.Namespace = vars.Namespace

// configure logs
reqLogger := log.FromContext(ctx)

// get node object
node := &corev1.Node{}
err := dr.Get(ctx, req.NamespacedName, node)
err := dr.getObject(ctx, req, node)
if err != nil {
if errors.IsNotFound(err) {
reqLogger.Error(err, "node object doesn't exist", "nodeName", req.Name)
return ctrl.Result{}, nil
}

reqLogger.Error(err, "failed to get node object from api re-queue the request", "nodeName", req.Name)
reqLogger.Error(err, "failed to get node object")
return ctrl.Result{}, err
}

// get sriovNodeNodeState object
nodeNetworkState := &sriovnetworkv1.SriovNetworkNodeState{}
err = dr.Get(ctx, req.NamespacedName, nodeNetworkState)
err = dr.getObject(ctx, req, nodeNetworkState)
if err != nil {
if errors.IsNotFound(err) {
reqLogger.Error(err, "sriovNetworkNodeState object doesn't exist", "nodeName", req.Name)
return ctrl.Result{}, nil
}

reqLogger.Error(err, "failed to get sriovNetworkNodeState object from api re-queue the request", "nodeName", req.Name)
reqLogger.Error(err, "failed to get sriovNetworkNodeState object")
return ctrl.Result{}, err
}

// create the drain state annotation if it doesn't exist in the sriovNetworkNodeState object
nodeStateDrainAnnotationCurrent, NodeStateDrainAnnotationCurrentExist := nodeNetworkState.Annotations[constants.NodeStateDrainAnnotationCurrent]
if !NodeStateDrainAnnotationCurrentExist {
err = utils.AnnotateObject(nodeNetworkState, constants.NodeStateDrainAnnotationCurrent, constants.DrainIdle, dr.Client)
if err != nil {
return ctrl.Result{}, err
}
nodeStateDrainAnnotationCurrent = constants.DrainIdle
nodeStateDrainAnnotationCurrent, err := dr.ensureAnnotationExists(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent)
if err != nil {
reqLogger.Error(err, "failed to ensure nodeStateDrainAnnotation")
return ctrl.Result{}, err
}

// create the drain state annotation if it doesn't exist in the node object
nodeDrainAnnotation, nodeDrainAnnotationExist := node.Annotations[constants.NodeDrainAnnotation]
if !nodeDrainAnnotationExist {
err = utils.AnnotateObject(node, constants.NodeDrainAnnotation, constants.DrainIdle, dr.Client)
if err != nil {
return ctrl.Result{}, err
}
nodeDrainAnnotation = constants.DrainIdle
nodeDrainAnnotation, err := dr.ensureAnnotationExists(ctx, node, constants.NodeDrainAnnotation)
if err != nil {
reqLogger.Error(err, "failed to ensure nodeStateDrainAnnotation")
return ctrl.Result{}, err
}

reqLogger.V(2).Info("Drain annotations", "nodeAnnotation", nodeDrainAnnotation, "nodeStateAnnotation", nodeStateDrainAnnotationCurrent)
Expand Down Expand Up @@ -218,11 +160,12 @@ func (dr *DrainReconcile) reconcile(ctx context.Context, req ctrl.Request) (ctrl
corev1.EventTypeWarning,
"DrainController",
"node complete drain was not completed")
// TODO: make this time configurable
return reconcile.Result{RequeueAfter: 5 * time.Second}, nil
}

// move the node state back to idle
err = utils.AnnotateObject(nodeNetworkState, constants.NodeStateDrainAnnotationCurrent, constants.DrainIdle, dr.Client)
err = utils.AnnotateObject(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent, constants.DrainIdle, dr.Client)
if err != nil {
reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.DrainIdle)
return ctrl.Result{}, err
Expand All @@ -235,7 +178,7 @@ func (dr *DrainReconcile) reconcile(ctx context.Context, req ctrl.Request) (ctrl
"node un drain completed")
return ctrl.Result{}, nil
}
} else {
} else if nodeDrainAnnotation == constants.DrainRequired || nodeDrainAnnotation == constants.RebootRequired {
// this cover the case a node request to drain or reboot

// nothing to do here we need to wait for the node to move back to idle
Expand All @@ -246,7 +189,7 @@ func (dr *DrainReconcile) reconcile(ctx context.Context, req ctrl.Request) (ctrl

// we need to start the drain, but first we need to check that we can drain the node
if nodeStateDrainAnnotationCurrent == constants.DrainIdle {
result, err := dr.checkForNodeDrain(ctx, node)
result, err := dr.tryDrainNode(ctx, node)
if err != nil {
reqLogger.Error(err, "failed to check if we can drain the node")
return ctrl.Result{}, err
Expand Down Expand Up @@ -280,7 +223,7 @@ func (dr *DrainReconcile) reconcile(ctx context.Context, req ctrl.Request) (ctrl
}

// if we manage to drain we label the node state with drain completed and finish
err = utils.AnnotateObject(nodeNetworkState, constants.NodeStateDrainAnnotationCurrent, constants.DrainComplete, dr.Client)
err = utils.AnnotateObject(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent, constants.DrainComplete, dr.Client)
if err != nil {
reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.DrainComplete)
return ctrl.Result{}, err
Expand All @@ -298,7 +241,38 @@ func (dr *DrainReconcile) reconcile(ctx context.Context, req ctrl.Request) (ctrl
return reconcile.Result{}, fmt.Errorf("unexpected node drain annotation")
}

func (dr *DrainReconcile) checkForNodeDrain(ctx context.Context, node *corev1.Node) (*reconcile.Result, error) {
func (dr *DrainReconcile) getObject(ctx context.Context, req ctrl.Request, object client.Object) error {
reqLogger := log.FromContext(ctx)
reqLogger.Info("getObject():")

err := dr.Get(ctx, req.NamespacedName, object)
if err != nil {
if errors.IsNotFound(err) {
reqLogger.Error(err, "object doesn't exist", "objectName", req.Name)
return nil
}

reqLogger.Error(err, "failed to get object from api re-queue the request", "objectName", req.Name)
return err
}

return nil
}

func (dr *DrainReconcile) ensureAnnotationExists(ctx context.Context, object client.Object, key string) (string, error) {
value, exist := object.GetAnnotations()[key]
if !exist {
err := utils.AnnotateObject(ctx, object, constants.NodeStateDrainAnnotationCurrent, constants.DrainIdle, dr.Client)
if err != nil {
return "", err
}
return constants.DrainIdle, nil
}

return value, nil
}

func (dr *DrainReconcile) tryDrainNode(ctx context.Context, node *corev1.Node) (*reconcile.Result, error) {
// configure logs
reqLogger := log.FromContext(ctx)
reqLogger.Info("checkForNodeDrain():")
Expand Down Expand Up @@ -361,7 +335,7 @@ func (dr *DrainReconcile) checkForNodeDrain(ctx context.Context, node *corev1.No
return nil, fmt.Errorf("failed to find sriov network node state for requested node")
}

err = utils.AnnotateObject(currentSnns, constants.NodeStateDrainAnnotationCurrent, constants.Draining, dr.Client)
err = utils.AnnotateObject(ctx, currentSnns, constants.NodeStateDrainAnnotationCurrent, constants.Draining, dr.Client)
if err != nil {
reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.Draining)
return nil, err
Expand All @@ -382,20 +356,13 @@ func (dr *DrainReconcile) findNodePoolConfig(ctx context.Context, node *corev1.N
}

selectedNpcl := []*sriovnetworkv1.SriovNetworkPoolConfig{}
var defaultNpcl *sriovnetworkv1.SriovNetworkPoolConfig

for _, npc := range npcl.Items {
// we skip hw offload objects
if npc.Spec.OvsHardwareOffloadConfig.Name != "" {
continue
}

// we save the default to use it if we don't find any other one
if npc.GetName() == "default" {
defaultNpcl = npc.DeepCopy()
continue
}

// if the node selector is empty we skip
if npc.Spec.NodeSelector == nil {
continue
Expand Down Expand Up @@ -439,11 +406,7 @@ func (dr *DrainReconcile) findNodePoolConfig(ctx context.Context, node *corev1.N

return selectedNpcl[0], nodeList.Items, nil
} else {
// we use the default policy if it was found
if defaultNpcl == nil {
return nil, nil, fmt.Errorf("failed to find the default sriov network pool config")
}
logger.V(2).Info("found sriovNetworkPool", "pool", *defaultNpcl)
logger.V(2).Info("node doesn't belong to any pool, using default drain configuration with MaxUnavailable of one", "pool", *defaultNpcl)

selector, err := metav1.LabelSelectorAsSelector(defaultNpcl.Spec.NodeSelector)
if err != nil {
Expand Down
Loading

0 comments on commit 1681015

Please sign in to comment.