From 3b00886d8fea7e16f2e288e8466a4cde6a1e94e1 Mon Sep 17 00:00:00 2001 From: Sebastian Sch Date: Thu, 25 Jul 2024 11:20:08 +0300 Subject: [PATCH] This commit introduces a new redesign on how the operator resets the device plugin * use a general nodeSelector to avoid updating the daemonset yaml * remove the config-daemon removing pod (better security) * make the operator in charge of resetting the device plugin via annotations * mark the node as cordon BEFORE we remove the device plugin (without drain) to avoid scheduling new pods until the device plugin is backed up Signed-off-by: Sebastian Sch --- controllers/drain_controller.go | 149 +++++++- controllers/helper.go | 106 +----- controllers/helper_test.go | 330 ------------------ .../sriovnetworknodepolicy_controller.go | 4 - controllers/sriovoperatorconfig_controller.go | 2 +- .../sriovoperatorconfig_controller_test.go | 48 --- deploy/clusterrole.yaml | 6 - .../templates/clusterrole.yaml | 6 - pkg/consts/constants.go | 5 + pkg/daemon/daemon.go | 87 ++--- pkg/daemon/daemon_test.go | 36 +- pkg/drain/drainer.go | 19 +- pkg/platforms/openshift/openshift.go | 12 + pkg/utils/cluster.go | 40 ++- 14 files changed, 268 insertions(+), 582 deletions(-) delete mode 100644 controllers/helper_test.go diff --git a/controllers/drain_controller.go b/controllers/drain_controller.go index 86da909d8..f0a63af36 100644 --- a/controllers/drain_controller.go +++ b/controllers/drain_controller.go @@ -172,6 +172,29 @@ func (dr *DrainReconcile) Reconcile(ctx context.Context, req ctrl.Request) (ctrl return reconcile.Result{RequeueAfter: 5 * time.Second}, nil } + // check the device plugin exited and enable it again + // only of we have something in the node state spec + if len(nodeNetworkState.Spec.Interfaces) > 0 { + completed, err = dr.enableSriovDevicePlugin(ctx, node) + if err != nil { + reqLogger.Error(err, "failed to enable SriovDevicePlugin") + dr.recorder.Event(nodeNetworkState, + corev1.EventTypeWarning, + "DrainController", + "failed to enable SriovDevicePlugin") + return ctrl.Result{}, err + } + + if !completed { + reqLogger.Info("sriov device plugin enable was not completed") + dr.recorder.Event(nodeNetworkState, + corev1.EventTypeWarning, + "DrainController", + "sriov device plugin enable was not completed") + return reconcile.Result{RequeueAfter: 5 * time.Second}, nil + } + } + // move the node state back to idle err = utils.AnnotateObject(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent, constants.DrainIdle, dr.Client) if err != nil { @@ -209,7 +232,7 @@ func (dr *DrainReconcile) Reconcile(ctx context.Context, req ctrl.Request) (ctrl } } - // class the drain function that will also call drain to other platform providers like openshift + // call the drain function that will also call drain to other platform providers like openshift drained, err := dr.drainer.DrainNode(ctx, node, nodeDrainAnnotation == constants.RebootRequired) if err != nil { reqLogger.Error(err, "error trying to drain the node") @@ -230,6 +253,17 @@ func (dr *DrainReconcile) Reconcile(ctx context.Context, req ctrl.Request) (ctrl return reconcile.Result{RequeueAfter: 5 * time.Second}, nil } + reqLogger.Info("remove Device plugin from node") + err = utils.LabelNode(ctx, node.Name, constants.SriovDevicePluginEnabledLabel, constants.SriovDevicePluginEnabledLabelDisabled, dr.Client) + if err != nil { + log.Log.Error(err, "failed to label node for device plugin label", + "labelKey", + constants.SriovDevicePluginEnabledLabel, + "labelValue", + constants.SriovDevicePluginEnabledLabelDisabled) + return reconcile.Result{}, err + } + // if we manage to drain we label the node state with drain completed and finish err = utils.AnnotateObject(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent, constants.DrainComplete, dr.Client) if err != nil { @@ -243,6 +277,60 @@ func (dr *DrainReconcile) Reconcile(ctx context.Context, req ctrl.Request) (ctrl "DrainController", "node drain completed") return ctrl.Result{}, nil + } else if nodeDrainAnnotation == constants.DevicePluginResetRequired { + // nothing to do here we need to wait for the node to move back to idle + if nodeStateDrainAnnotationCurrent == constants.DrainComplete { + reqLogger.Info("node requested a drain and nodeState is on drain completed nothing todo") + return ctrl.Result{}, nil + } + + // if we are on idle state we move it to drain + if nodeStateDrainAnnotationCurrent == constants.DrainIdle { + err = utils.AnnotateObject(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent, constants.Draining, dr.Client) + if err != nil { + reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.Draining) + return ctrl.Result{}, err + } + return ctrl.Result{}, nil + } + + // This cover a case where we only need to reset the device plugin + // for that we are going to cordon the node, so we don't get new pods allocated + // to the node in the time we remove the device plugin + err = dr.drainer.RunCordonOrUncordon(ctx, node, true) + if err != nil { + log.Log.Error(err, "failed to cordon on node") + return reconcile.Result{}, err + } + + // we switch the sriov label to disable and mark the drain as completed + // no need to wait for the device plugin to exist here as we cordon the node, + // and we want to config-daemon to start the configuration in parallel of the kube-controller to remove the pod + // we check the device plugin was removed when the config-daemon moves is desire state to idle + reqLogger.Info("disable Device plugin from node") + err = utils.LabelNode(ctx, node.Name, constants.SriovDevicePluginEnabledLabel, constants.SriovDevicePluginEnabledLabelDisabled, dr.Client) + if err != nil { + log.Log.Error(err, "failed to label node for device plugin label", + "labelKey", + constants.SriovDevicePluginEnabledLabel, + "labelValue", + constants.SriovDevicePluginEnabledLabelDisabled) + return reconcile.Result{}, err + } + + // if we manage to cordon we label the node state with drain completed and finish + err = utils.AnnotateObject(ctx, nodeNetworkState, constants.NodeStateDrainAnnotationCurrent, constants.DrainComplete, dr.Client) + if err != nil { + reqLogger.Error(err, "failed to annotate node with annotation", "annotation", constants.DrainComplete) + return ctrl.Result{}, err + } + + reqLogger.Info("node cordoned successfully and device plugin removed") + dr.recorder.Event(nodeNetworkState, + corev1.EventTypeWarning, + "DrainController", + "node cordoned and device plugin removed completed") + return ctrl.Result{}, nil } reqLogger.Error(nil, "unexpected node drain annotation") @@ -436,6 +524,65 @@ func (dr *DrainReconcile) findNodePoolConfig(ctx context.Context, node *corev1.N } } +// enableSriovDevicePlugin change the device plugin label on the requested node to enable +// if there is a pod still running we will return false +func (dr *DrainReconcile) enableSriovDevicePlugin(ctx context.Context, node *corev1.Node) (bool, error) { + logger := log.FromContext(ctx) + logger.Info("enableSriovDevicePlugin():") + pods := &corev1.PodList{} + + // check if the device plugin is terminating only if the node annotation for device plugin is disabled + if node.Annotations[constants.SriovDevicePluginEnabledLabel] == constants.SriovDevicePluginEnabledLabelDisabled { + err := dr.List(context.Background(), pods, &client.ListOptions{ + Raw: &metav1.ListOptions{ + LabelSelector: "app=sriov-device-plugin", + FieldSelector: "spec.nodeName=" + node.Name, + ResourceVersion: "0"}, + }) + + if err != nil { + logger.Error(err, "failed to list device plugin pods running on node") + return false, err + } + + if len(pods.Items) != 0 { + log.Log.V(2).Info("device plugin pod still terminating on node") + return false, nil + } + } + + logger.Info("enable Device plugin from node") + err := utils.LabelNode(ctx, node.Name, constants.SriovDevicePluginEnabledLabel, constants.SriovDevicePluginEnabledLabelEnabled, dr.Client) + if err != nil { + log.Log.Error(err, "failed to label node for device plugin label", + "labelKey", + constants.SriovDevicePluginEnabledLabel, + "labelValue", + constants.SriovDevicePluginEnabledLabelEnabled) + return false, err + } + + // check if the device plugin pod is running on the node + err = dr.List(context.Background(), pods, &client.ListOptions{ + Raw: &metav1.ListOptions{ + LabelSelector: "app=sriov-device-plugin", + FieldSelector: "spec.nodeName=" + node.Name, + ResourceVersion: "0"}, + }) + if err != nil { + logger.Error(err, "failed to list device plugin pods running on node") + return false, err + } + + if len(pods.Items) == 1 && pods.Items[0].Status.Phase == corev1.PodRunning { + logger.Info("Device plugin pod running on node") + return true, nil + } + + logger.V(2).Info("Device plugin pod still not running on node") + return false, nil +} + // SetupWithManager sets up the controller with the Manager. func (dr *DrainReconcile) SetupWithManager(mgr ctrl.Manager) error { createUpdateEnqueue := handler.Funcs{ diff --git a/controllers/helper.go b/controllers/helper.go index 9ff735473..17291ff71 100644 --- a/controllers/helper.go +++ b/controllers/helper.go @@ -22,12 +22,10 @@ import ( "encoding/json" "fmt" "os" - "sort" "strings" errs "github.com/pkg/errors" appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" uns "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -152,29 +150,25 @@ func formatJSON(str string) (string, error) { return prettyJSON.String(), nil } +// GetDefaultNodeSelector return a nodeSelector with worker and linux os func GetDefaultNodeSelector() map[string]string { return map[string]string{"node-role.kubernetes.io/worker": "", "kubernetes.io/os": "linux"} } -// hasNoValidPolicy returns true if no SriovNetworkNodePolicy -// or only the (deprecated) "default" policy is present -func hasNoValidPolicy(pl []sriovnetworkv1.SriovNetworkNodePolicy) bool { - switch len(pl) { - case 0: - return true - case 1: - return pl[0].Name == constants.DefaultPolicyName - default: - return false - } +// GetDefaultNodeSelectorForDevicePlugin return a nodeSelector with worker linux os +// and the enabled sriov device plugin +func GetDefaultNodeSelectorForDevicePlugin() map[string]string { + return map[string]string{ + "node-role.kubernetes.io/worker": "", + "kubernetes.io/os": "linux", + constants.SriovDevicePluginEnabledLabel: constants.SriovDevicePluginEnabledLabelEnabled} } func syncPluginDaemonObjs(ctx context.Context, client k8sclient.Client, scheme *runtime.Scheme, - dc *sriovnetworkv1.SriovOperatorConfig, - pl *sriovnetworkv1.SriovNetworkNodePolicyList) error { + dc *sriovnetworkv1.SriovOperatorConfig) error { logger := log.Log.WithName("syncPluginDaemonObjs") logger.V(1).Info("Start to sync sriov daemons objects") @@ -185,7 +179,7 @@ func syncPluginDaemonObjs(ctx context.Context, data.Data["ReleaseVersion"] = os.Getenv("RELEASEVERSION") data.Data["ResourcePrefix"] = vars.ResourcePrefix data.Data["ImagePullSecrets"] = GetImagePullSecrets() - data.Data["NodeSelectorField"] = GetDefaultNodeSelector() + data.Data["NodeSelectorField"] = GetDefaultNodeSelectorForDevicePlugin() data.Data["UseCDI"] = dc.Spec.UseCDI objs, err := renderDsForCR(constants.PluginPath, &data) if err != nil { @@ -193,16 +187,6 @@ func syncPluginDaemonObjs(ctx context.Context, return err } - if hasNoValidPolicy(pl.Items) { - for _, obj := range objs { - err := deleteK8sResource(ctx, client, obj) - if err != nil { - return err - } - } - return nil - } - // Sync DaemonSets for _, obj := range objs { if obj.GetKind() == constants.DaemonSet && len(dc.Spec.ConfigDaemonNodeSelector) > 0 { @@ -214,13 +198,15 @@ func syncPluginDaemonObjs(ctx context.Context, return err } ds.Spec.Template.Spec.NodeSelector = dc.Spec.ConfigDaemonNodeSelector + // add the special node selector for the device plugin + ds.Spec.Template.Spec.NodeSelector[constants.SriovDevicePluginEnabledLabel] = constants.SriovDevicePluginEnabledLabelEnabled err = scheme.Convert(ds, obj, nil) if err != nil { logger.Error(err, "Fail to convert to Unstructured") return err } } - err = syncDsObject(ctx, client, scheme, dc, pl, obj) + err = syncDsObject(ctx, client, scheme, dc, obj) if err != nil { logger.Error(err, "Couldn't sync SR-IoV daemons objects") return err @@ -230,14 +216,7 @@ func syncPluginDaemonObjs(ctx context.Context, return nil } -func deleteK8sResource(ctx context.Context, client k8sclient.Client, in *uns.Unstructured) error { - if err := apply.DeleteObject(ctx, client, in); err != nil { - return fmt.Errorf("failed to delete object %v with err: %v", in, err) - } - return nil -} - -func syncDsObject(ctx context.Context, client k8sclient.Client, scheme *runtime.Scheme, dc *sriovnetworkv1.SriovOperatorConfig, pl *sriovnetworkv1.SriovNetworkNodePolicyList, obj *uns.Unstructured) error { +func syncDsObject(ctx context.Context, client k8sclient.Client, scheme *runtime.Scheme, dc *sriovnetworkv1.SriovOperatorConfig, obj *uns.Unstructured) error { logger := log.Log.WithName("syncDsObject") kind := obj.GetKind() logger.V(1).Info("Start to sync Objects", "Kind", kind) @@ -257,7 +236,7 @@ func syncDsObject(ctx context.Context, client k8sclient.Client, scheme *runtime. logger.Error(err, "Fail to convert to DaemonSet") return err } - err = syncDaemonSet(ctx, client, scheme, dc, pl, ds) + err = syncDaemonSet(ctx, client, scheme, dc, ds) if err != nil { logger.Error(err, "Fail to sync DaemonSet", "Namespace", ds.Namespace, "Name", ds.Name) return err @@ -266,54 +245,6 @@ func syncDsObject(ctx context.Context, client k8sclient.Client, scheme *runtime. return nil } -func setDsNodeAffinity(pl *sriovnetworkv1.SriovNetworkNodePolicyList, ds *appsv1.DaemonSet) error { - terms := nodeSelectorTermsForPolicyList(pl.Items) - if len(terms) > 0 { - ds.Spec.Template.Spec.Affinity = &corev1.Affinity{ - NodeAffinity: &corev1.NodeAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ - NodeSelectorTerms: terms, - }, - }, - } - } - return nil -} - -func nodeSelectorTermsForPolicyList(policies []sriovnetworkv1.SriovNetworkNodePolicy) []corev1.NodeSelectorTerm { - terms := []corev1.NodeSelectorTerm{} - for _, p := range policies { - // Note(adrianc): default policy is deprecated and ignored. - if p.Name == constants.DefaultPolicyName { - continue - } - - if len(p.Spec.NodeSelector) == 0 { - continue - } - expressions := []corev1.NodeSelectorRequirement{} - for k, v := range p.Spec.NodeSelector { - exp := corev1.NodeSelectorRequirement{ - Operator: corev1.NodeSelectorOpIn, - Key: k, - Values: []string{v}, - } - expressions = append(expressions, exp) - } - // sorting is needed to keep the daemon spec stable. - // the items are popped in a random order from the map - sort.Slice(expressions, func(i, j int) bool { - return expressions[i].Key < expressions[j].Key - }) - nodeSelector := corev1.NodeSelectorTerm{ - MatchExpressions: expressions, - } - terms = append(terms, nodeSelector) - } - - return terms -} - // renderDsForCR returns a busybox pod with the same name/namespace as the cr func renderDsForCR(path string, data *render.RenderData) ([]*uns.Unstructured, error) { logger := log.Log.WithName("renderDsForCR") @@ -326,16 +257,11 @@ func renderDsForCR(path string, data *render.RenderData) ([]*uns.Unstructured, e return objs, nil } -func syncDaemonSet(ctx context.Context, client k8sclient.Client, scheme *runtime.Scheme, dc *sriovnetworkv1.SriovOperatorConfig, pl *sriovnetworkv1.SriovNetworkNodePolicyList, in *appsv1.DaemonSet) error { +func syncDaemonSet(ctx context.Context, client k8sclient.Client, scheme *runtime.Scheme, dc *sriovnetworkv1.SriovOperatorConfig, in *appsv1.DaemonSet) error { logger := log.Log.WithName("syncDaemonSet") logger.V(1).Info("Start to sync DaemonSet", "Namespace", in.Namespace, "Name", in.Name) var err error - if pl != nil { - if err = setDsNodeAffinity(pl, in); err != nil { - return err - } - } if err = controllerutil.SetControllerReference(dc, in, scheme); err != nil { return err } diff --git a/controllers/helper_test.go b/controllers/helper_test.go deleted file mode 100644 index d998cf0da..000000000 --- a/controllers/helper_test.go +++ /dev/null @@ -1,330 +0,0 @@ -/* - - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package controllers - -import ( - "context" - "sync" - "testing" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - - "github.com/google/go-cmp/cmp" - appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - controllerruntime "sigs.k8s.io/controller-runtime" - - sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1" - "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/vars" -) - -func TestNodeSelectorMerge(t *testing.T) { - table := []struct { - tname string - policies []sriovnetworkv1.SriovNetworkNodePolicy - expected []corev1.NodeSelectorTerm - }{ - { - tname: "testoneselector", - policies: []sriovnetworkv1.SriovNetworkNodePolicy{ - { - Spec: sriovnetworkv1.SriovNetworkNodePolicySpec{ - NodeSelector: map[string]string{ - "foo": "bar", - }, - }, - }, - { - Spec: sriovnetworkv1.SriovNetworkNodePolicySpec{ - NodeSelector: map[string]string{ - "bb": "cc", - }, - }, - }, - }, - expected: []corev1.NodeSelectorTerm{ - { - MatchExpressions: []corev1.NodeSelectorRequirement{ - { - Operator: corev1.NodeSelectorOpIn, - Key: "foo", - Values: []string{"bar"}, - }, - }, - }, - { - MatchExpressions: []corev1.NodeSelectorRequirement{ - { - Operator: corev1.NodeSelectorOpIn, - Key: "bb", - Values: []string{"cc"}, - }, - }, - }, - }, - }, - { - tname: "testtwoselectors", - policies: []sriovnetworkv1.SriovNetworkNodePolicy{ - { - Spec: sriovnetworkv1.SriovNetworkNodePolicySpec{ - NodeSelector: map[string]string{ - "foo": "bar", - "foo1": "bar1", - }, - }, - }, - { - Spec: sriovnetworkv1.SriovNetworkNodePolicySpec{ - NodeSelector: map[string]string{ - "bb": "cc", - "bb1": "cc1", - "bb2": "cc2", - }, - }, - }, - }, - expected: []corev1.NodeSelectorTerm{ - { - MatchExpressions: []corev1.NodeSelectorRequirement{ - { - Operator: corev1.NodeSelectorOpIn, - Key: "foo", - Values: []string{"bar"}, - }, - { - Operator: corev1.NodeSelectorOpIn, - Key: "foo1", - Values: []string{"bar1"}, - }, - }, - }, - { - MatchExpressions: []corev1.NodeSelectorRequirement{ - { - Operator: corev1.NodeSelectorOpIn, - Key: "bb", - Values: []string{"cc"}, - }, - { - Operator: corev1.NodeSelectorOpIn, - Key: "bb1", - Values: []string{"cc1"}, - }, - { - Operator: corev1.NodeSelectorOpIn, - Key: "bb2", - Values: []string{"cc2"}, - }, - }, - }, - }, - }, - { - tname: "testemptyselector", - policies: []sriovnetworkv1.SriovNetworkNodePolicy{ - { - Spec: sriovnetworkv1.SriovNetworkNodePolicySpec{ - NodeSelector: map[string]string{}, - }, - }, - }, - expected: []corev1.NodeSelectorTerm{}, - }, - } - - for _, tc := range table { - t.Run(tc.tname, func(t *testing.T) { - selectors := nodeSelectorTermsForPolicyList(tc.policies) - if !cmp.Equal(selectors, tc.expected) { - t.Error(tc.tname, "Selectors not as expected", cmp.Diff(selectors, tc.expected)) - } - }) - } -} - -var _ = Describe("Helper Validation", Ordered, func() { - - var cancel context.CancelFunc - var ctx context.Context - var dc *sriovnetworkv1.SriovOperatorConfig - var in *appsv1.DaemonSet - - BeforeAll(func() { - By("Setup controller manager") - k8sManager, err := setupK8sManagerForTest() - Expect(err).ToNot(HaveOccurred()) - - ctx, cancel = context.WithCancel(context.Background()) - - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - defer GinkgoRecover() - By("Start controller manager") - err := k8sManager.Start(ctx) - Expect(err).ToNot(HaveOccurred()) - }() - - DeferCleanup(func() { - By("Shutdown controller manager") - cancel() - wg.Wait() - }) - }) - - BeforeEach(func() { - dc = &sriovnetworkv1.SriovOperatorConfig{ - ObjectMeta: controllerruntime.ObjectMeta{ - Name: "default", - Namespace: vars.Namespace, - UID: "12312312"}} - in = &appsv1.DaemonSet{ - ObjectMeta: controllerruntime.ObjectMeta{ - Name: "sriov-device-plugin", - Namespace: vars.Namespace}, - Spec: appsv1.DaemonSetSpec{ - Selector: &metav1.LabelSelector{ - MatchLabels: map[string]string{"app": "sriov-device-plugin"}}, - Template: corev1.PodTemplateSpec{ - ObjectMeta: controllerruntime.ObjectMeta{ - Labels: map[string]string{"app": "sriov-device-plugin"}}, - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Image: "test:latest", - Name: "test", - }, - }, - }, - }}} - - err := k8sClient.Delete(ctx, in) - if err != nil { - Expect(errors.IsNotFound(err)).To(BeTrue()) - } - }) - - Context("syncDaemonSet", func() { - It("should create a new daemon", func() { - pl := &sriovnetworkv1.SriovNetworkNodePolicyList{Items: []sriovnetworkv1.SriovNetworkNodePolicy{ - {ObjectMeta: controllerruntime.ObjectMeta{Name: "test", Namespace: vars.Namespace}}, - }} - err := syncDaemonSet(ctx, k8sClient, vars.Scheme, dc, pl, in) - Expect(err).ToNot(HaveOccurred()) - Expect(in.Spec.Template.Spec.Affinity).To(BeNil()) - }) - It("should update affinity", func() { - pl := &sriovnetworkv1.SriovNetworkNodePolicyList{Items: []sriovnetworkv1.SriovNetworkNodePolicy{ - { - ObjectMeta: controllerruntime.ObjectMeta{ - Name: "test", - Namespace: vars.Namespace, - }, - Spec: sriovnetworkv1.SriovNetworkNodePolicySpec{ - NodeSelector: map[string]string{"test": "test"}, - }, - }, - }} - - err := k8sClient.Create(ctx, in) - Expect(err).ToNot(HaveOccurred()) - - err = syncDaemonSet(ctx, k8sClient, vars.Scheme, dc, pl, in) - Expect(err).ToNot(HaveOccurred()) - Expect(in.Spec.Template.Spec.Affinity).ToNot(BeNil()) - Expect(in.Spec.Template.Spec.Affinity.NodeAffinity).ToNot(BeNil()) - Expect(in.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution).ToNot(BeNil()) - Expect(len(in.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms)).To(Equal(1)) - }) - It("should update affinity with multiple", func() { - pl := &sriovnetworkv1.SriovNetworkNodePolicyList{Items: []sriovnetworkv1.SriovNetworkNodePolicy{ - { - ObjectMeta: controllerruntime.ObjectMeta{ - Name: "test", - Namespace: vars.Namespace, - }, - Spec: sriovnetworkv1.SriovNetworkNodePolicySpec{ - NodeSelector: map[string]string{"test": "test"}, - }, - }, - { - ObjectMeta: controllerruntime.ObjectMeta{ - Name: "test1", - Namespace: vars.Namespace, - }, - Spec: sriovnetworkv1.SriovNetworkNodePolicySpec{ - NodeSelector: map[string]string{"test1": "test"}, - }, - }, - }} - - err := k8sClient.Create(ctx, in) - Expect(err).ToNot(HaveOccurred()) - - err = syncDaemonSet(ctx, k8sClient, vars.Scheme, dc, pl, in) - Expect(err).ToNot(HaveOccurred()) - Expect(in.Spec.Template.Spec.Affinity).ToNot(BeNil()) - Expect(in.Spec.Template.Spec.Affinity.NodeAffinity).ToNot(BeNil()) - Expect(in.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution).ToNot(BeNil()) - Expect(len(in.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms)).To(Equal(2)) - }) - It("should switch affinity", func() { - pl := &sriovnetworkv1.SriovNetworkNodePolicyList{Items: []sriovnetworkv1.SriovNetworkNodePolicy{ - { - ObjectMeta: controllerruntime.ObjectMeta{ - Name: "test1", - Namespace: vars.Namespace, - }, - Spec: sriovnetworkv1.SriovNetworkNodePolicySpec{ - NodeSelector: map[string]string{"test1": "test"}, - }, - }, - }} - - in.Spec.Template.Spec.Affinity = &corev1.Affinity{ - NodeAffinity: &corev1.NodeAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ - NodeSelectorTerms: []corev1.NodeSelectorTerm{{ - MatchExpressions: []corev1.NodeSelectorRequirement{{ - Operator: corev1.NodeSelectorOpIn, - Key: "test", - Values: []string{"test"}, - }}, - }}, - }, - }, - } - - err := k8sClient.Create(ctx, in) - Expect(err).ToNot(HaveOccurred()) - - err = syncDaemonSet(ctx, k8sClient, vars.Scheme, dc, pl, in) - Expect(err).ToNot(HaveOccurred()) - Expect(in.Spec.Template.Spec.Affinity).ToNot(BeNil()) - Expect(in.Spec.Template.Spec.Affinity.NodeAffinity).ToNot(BeNil()) - Expect(in.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution).ToNot(BeNil()) - Expect(len(in.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms)).To(Equal(1)) - Expect(len(in.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions)).To(Equal(1)) - Expect(in.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions[0].Key).To(Equal("test1")) - }) - }) -}) diff --git a/controllers/sriovnetworknodepolicy_controller.go b/controllers/sriovnetworknodepolicy_controller.go index e5abc5f08..79ff7328c 100644 --- a/controllers/sriovnetworknodepolicy_controller.go +++ b/controllers/sriovnetworknodepolicy_controller.go @@ -133,10 +133,6 @@ func (r *SriovNetworkNodePolicyReconciler) Reconcile(ctx context.Context, req ct if err = r.syncDevicePluginConfigMap(ctx, defaultOpConf, policyList, nodeList); err != nil { return reconcile.Result{}, err } - // Render and sync Daemon objects - if err = syncPluginDaemonObjs(ctx, r.Client, r.Scheme, defaultOpConf, policyList); err != nil { - return reconcile.Result{}, err - } // All was successful. Request that this be re-triggered after ResyncPeriod, // so we can reconcile state again. diff --git a/controllers/sriovoperatorconfig_controller.go b/controllers/sriovoperatorconfig_controller.go index 11cfdfc36..ebe65e363 100644 --- a/controllers/sriovoperatorconfig_controller.go +++ b/controllers/sriovoperatorconfig_controller.go @@ -124,7 +124,7 @@ func (r *SriovOperatorConfigReconciler) Reconcile(ctx context.Context, req ctrl. return reconcile.Result{}, err } - if err = syncPluginDaemonObjs(ctx, r.Client, r.Scheme, defaultConfig, policyList); err != nil { + if err = syncPluginDaemonObjs(ctx, r.Client, r.Scheme, defaultConfig); err != nil { return reconcile.Result{}, err } diff --git a/controllers/sriovoperatorconfig_controller_test.go b/controllers/sriovoperatorconfig_controller_test.go index 0dcc985f2..868a55c40 100644 --- a/controllers/sriovoperatorconfig_controller_test.go +++ b/controllers/sriovoperatorconfig_controller_test.go @@ -2,7 +2,6 @@ package controllers import ( "context" - "fmt" "os" "strings" "sync" @@ -11,7 +10,6 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" @@ -429,51 +427,5 @@ var _ = Describe("SriovOperatorConfig controller", Ordered, func() { g.Expect(injectorCfg.Webhooks[0].ClientConfig.CABundle).To(Equal([]byte("ca-bundle-2\n"))) }, "1s").Should(Succeed()) }) - It("should reconcile to a converging state when multiple node policies are set", func() { - By("Creating a consistent number of node policies") - for i := 0; i < 30; i++ { - p := &sriovnetworkv1.SriovNetworkNodePolicy{ - ObjectMeta: metav1.ObjectMeta{Namespace: testNamespace, Name: fmt.Sprintf("p%d", i)}, - Spec: sriovnetworkv1.SriovNetworkNodePolicySpec{ - Priority: 99, - NodeSelector: map[string]string{"foo": fmt.Sprintf("v%d", i)}, - }, - } - err := k8sClient.Create(context.Background(), p) - Expect(err).NotTo(HaveOccurred()) - } - - By("Triggering a the reconcile loop") - config := &sriovnetworkv1.SriovOperatorConfig{} - err := k8sClient.Get(context.Background(), types.NamespacedName{Name: "default", Namespace: testNamespace}, config) - Expect(err).NotTo(HaveOccurred()) - if config.ObjectMeta.Labels == nil { - config.ObjectMeta.Labels = make(map[string]string) - } - config.ObjectMeta.Labels["trigger-test"] = "test-reconcile-daemonset" - err = k8sClient.Update(context.Background(), config) - Expect(err).NotTo(HaveOccurred()) - - By("Wait until device-plugin Daemonset's affinity has been calculated") - var expectedAffinity *corev1.Affinity - - Eventually(func(g Gomega) { - daemonSet := &appsv1.DaemonSet{} - err = k8sClient.Get(context.Background(), types.NamespacedName{Name: "sriov-device-plugin", Namespace: testNamespace}, daemonSet) - g.Expect(err).NotTo(HaveOccurred()) - // Wait until the last policy (with NodeSelector foo=v29) has been considered at least one time - g.Expect(daemonSet.Spec.Template.Spec.Affinity.String()).To(ContainSubstring("v29")) - expectedAffinity = daemonSet.Spec.Template.Spec.Affinity - }, "3s", "1s").Should(Succeed()) - - By("Verify device-plugin Daemonset's affinity doesn't change over time") - Consistently(func(g Gomega) { - daemonSet := &appsv1.DaemonSet{} - err = k8sClient.Get(context.Background(), types.NamespacedName{Name: "sriov-device-plugin", Namespace: testNamespace}, daemonSet) - g.Expect(err).NotTo(HaveOccurred()) - g.Expect(daemonSet.Spec.Template.Spec.Affinity). - To(Equal(expectedAffinity)) - }, "3s", "1s").Should(Succeed()) - }) }) }) diff --git a/deploy/clusterrole.yaml b/deploy/clusterrole.yaml index e7a596061..e7a84394e 100644 --- a/deploy/clusterrole.yaml +++ b/deploy/clusterrole.yaml @@ -45,12 +45,6 @@ rules: - apiGroups: [""] resources: ["nodes"] verbs: ["get", "list", "watch", "patch", "update"] -- apiGroups: [""] - resources: ["pods"] - verbs: ["*"] -- apiGroups: ["apps"] - resources: ["daemonsets"] - verbs: ["get"] - apiGroups: [ "config.openshift.io" ] resources: [ "infrastructures" ] verbs: [ "get", "list", "watch" ] diff --git a/deployment/sriov-network-operator-chart/templates/clusterrole.yaml b/deployment/sriov-network-operator-chart/templates/clusterrole.yaml index 7cd8fd014..519d2c05c 100644 --- a/deployment/sriov-network-operator-chart/templates/clusterrole.yaml +++ b/deployment/sriov-network-operator-chart/templates/clusterrole.yaml @@ -49,12 +49,6 @@ rules: - apiGroups: [""] resources: ["nodes"] verbs: ["get", "list", "watch", "patch", "update"] - - apiGroups: [""] - resources: ["pods"] - verbs: ["*"] - - apiGroups: ["apps"] - resources: ["daemonsets"] - verbs: ["get"] - apiGroups: [ "config.openshift.io" ] resources: [ "infrastructures" ] verbs: [ "get", "list", "watch" ] diff --git a/pkg/consts/constants.go b/pkg/consts/constants.go index 09e20cd89..c20239933 100644 --- a/pkg/consts/constants.go +++ b/pkg/consts/constants.go @@ -67,12 +67,17 @@ const ( MachineConfigPoolPausedAnnotationIdle = "Idle" MachineConfigPoolPausedAnnotationPaused = "Paused" + SriovDevicePluginEnabledLabel = "sriovnetwork.openshift.io/device-plugin" + SriovDevicePluginEnabledLabelEnabled = "Enabled" + SriovDevicePluginEnabledLabelDisabled = "Disabled" + NodeDrainAnnotation = "sriovnetwork.openshift.io/state" NodeStateDrainAnnotation = "sriovnetwork.openshift.io/desired-state" NodeStateDrainAnnotationCurrent = "sriovnetwork.openshift.io/current-state" DrainIdle = "Idle" DrainRequired = "Drain_Required" RebootRequired = "Reboot_Required" + DevicePluginResetRequired = "Device_Plugin_Reset_Required" Draining = "Draining" DrainComplete = "DrainComplete" diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index 0fd1ec2cb..5641c4ca1 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -9,7 +9,6 @@ import ( "time" "golang.org/x/time/rate" - "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" @@ -464,6 +463,27 @@ func (dn *Daemon) nodeStateSyncHandler() error { } } + // if we don't need to drain, and we are on idle we need to request the device plugin reset + if !reqDrain && utils.ObjectHasAnnotation(dn.desiredNodeState, + consts.NodeStateDrainAnnotationCurrent, + consts.DrainIdle) { + log.Log.Info("nodeStateSyncHandler(): apply 'Device_Plugin_Reset_Required' annotation for node") + err := utils.AnnotateNode(context.Background(), vars.NodeName, consts.NodeDrainAnnotation, consts.DevicePluginResetRequired, dn.client) + if err != nil { + log.Log.Error(err, "handleDrain(): Failed to annotate node") + return err + } + + log.Log.Info("handleDrain(): apply 'Device_Plugin_Reset_Required' annotation for nodeState") + if err := utils.AnnotateObject(context.Background(), dn.desiredNodeState, + consts.NodeStateDrainAnnotation, + consts.DevicePluginResetRequired, dn.client); err != nil { + return err + } + + return nil + } + // apply the vendor plugins after we are done with drain if needed for k, p := range dn.loadedPlugins { // Skip both the general and virtual plugin apply them last @@ -509,13 +529,6 @@ func (dn *Daemon) nodeStateSyncHandler() error { return nil } - // restart device plugin pod - log.Log.Info("nodeStateSyncHandler(): restart device plugin pod") - if err := dn.restartDevicePluginPod(); err != nil { - log.Log.Error(err, "nodeStateSyncHandler(): fail to restart device plugin pod") - return err - } - log.Log.Info("nodeStateSyncHandler(): apply 'Idle' annotation for node") err = utils.AnnotateNode(context.Background(), vars.NodeName, consts.NodeDrainAnnotation, consts.DrainIdle, dn.client) if err != nil { @@ -664,64 +677,6 @@ func (dn *Daemon) handleDrain(reqReboot bool) (bool, error) { return true, nil } -func (dn *Daemon) restartDevicePluginPod() error { - dn.mu.Lock() - defer dn.mu.Unlock() - log.Log.V(2).Info("restartDevicePluginPod(): try to restart device plugin pod") - - var podToDelete string - pods, err := dn.kubeClient.CoreV1().Pods(vars.Namespace).List(context.Background(), metav1.ListOptions{ - LabelSelector: "app=sriov-device-plugin", - FieldSelector: "spec.nodeName=" + vars.NodeName, - ResourceVersion: "0", - }) - if err != nil { - if errors.IsNotFound(err) { - log.Log.Info("restartDevicePluginPod(): device plugin pod exited") - return nil - } - log.Log.Error(err, "restartDevicePluginPod(): Failed to list device plugin pod, retrying") - return err - } - - if len(pods.Items) == 0 { - log.Log.Info("restartDevicePluginPod(): device plugin pod exited") - return nil - } - podToDelete = pods.Items[0].Name - - log.Log.V(2).Info("restartDevicePluginPod(): Found device plugin pod, deleting it", "pod-name", podToDelete) - err = dn.kubeClient.CoreV1().Pods(vars.Namespace).Delete(context.Background(), podToDelete, metav1.DeleteOptions{}) - if errors.IsNotFound(err) { - log.Log.Info("restartDevicePluginPod(): pod to delete not found") - return nil - } - if err != nil { - log.Log.Error(err, "restartDevicePluginPod(): Failed to delete device plugin pod, retrying") - return err - } - - if err := wait.PollImmediateUntil(3*time.Second, func() (bool, error) { - _, err := dn.kubeClient.CoreV1().Pods(vars.Namespace).Get(context.Background(), podToDelete, metav1.GetOptions{}) - if errors.IsNotFound(err) { - log.Log.Info("restartDevicePluginPod(): device plugin pod exited") - return true, nil - } - - if err != nil { - log.Log.Error(err, "restartDevicePluginPod(): Failed to check for device plugin exit, retrying") - } else { - log.Log.Info("restartDevicePluginPod(): waiting for device plugin pod to exit", "pod-name", podToDelete) - } - return false, nil - }, dn.stopCh); err != nil { - log.Log.Error(err, "restartDevicePluginPod(): failed to wait for checking pod deletion") - return err - } - - return nil -} - func (dn *Daemon) rebootNode() { log.Log.Info("rebootNode(): trigger node reboot") exit, err := dn.HostHelpers.Chroot(consts.Host) diff --git a/pkg/daemon/daemon_test.go b/pkg/daemon/daemon_test.go index a1e29dcb2..d88e9c126 100644 --- a/pkg/daemon/daemon_test.go +++ b/pkg/daemon/daemon_test.go @@ -107,19 +107,6 @@ var _ = Describe("Config Daemon", func() { }, } - SriovDevicePluginPod := corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "sriov-device-plugin-xxxx", - Namespace: vars.Namespace, - Labels: map[string]string{ - "app": "sriov-device-plugin", - }, - }, - Spec: corev1.PodSpec{ - NodeName: "test-node", - }, - } - err = sriovnetworkv1.AddToScheme(scheme.Scheme) Expect(err).ToNot(HaveOccurred()) kClient := kclient.NewClientBuilder().WithScheme(scheme.Scheme).WithRuntimeObjects(&corev1.Node{ @@ -130,7 +117,7 @@ var _ = Describe("Config Daemon", func() { Namespace: vars.Namespace, }}).Build() - kubeClient := fakek8s.NewSimpleClientset(&FakeSupportedNicIDs, &SriovDevicePluginPod) + kubeClient := fakek8s.NewSimpleClientset(&FakeSupportedNicIDs) snclient := snclientset.NewSimpleClientset() err = sriovnetworkv1.InitNicIDMapFromConfigMap(kubeClient, vars.Namespace) Expect(err).ToNot(HaveOccurred()) @@ -225,22 +212,15 @@ var _ = Describe("Config Daemon", func() { Eventually(refreshCh, "30s").Should(Receive(&msg)) Expect(msg.syncStatus).To(Equal("InProgress")) + nodeState.Annotations[consts.NodeStateDrainAnnotationCurrent] = consts.DrainComplete + err = updateSriovNetworkNodeState(sut.sriovClient, nodeState) + Expect(err).ToNot(HaveOccurred()) + + Eventually(refreshCh, "30s").Should(Receive(&msg)) + Expect(msg.syncStatus).To(Equal("InProgress")) Eventually(refreshCh, "30s").Should(Receive(&msg)) Expect(msg.syncStatus).To(Equal("Succeeded")) - Eventually(func() (int, error) { - podList, err := sut.kubeClient.CoreV1().Pods(vars.Namespace).List(context.Background(), metav1.ListOptions{ - LabelSelector: "app=sriov-device-plugin", - FieldSelector: "spec.nodeName=test-node", - }) - - if err != nil { - return 0, err - } - - return len(podList.Items), nil - }, "10s").Should(BeZero()) - }) It("ignore non latest SriovNetworkNodeState generations", func() { @@ -267,7 +247,7 @@ var _ = Describe("Config Daemon", func() { ObjectMeta: metav1.ObjectMeta{ Name: "test-node", Generation: 777, - Annotations: map[string]string{consts.NodeStateDrainAnnotationCurrent: consts.DrainIdle}, + Annotations: map[string]string{consts.NodeStateDrainAnnotationCurrent: consts.DrainComplete}, }, } Expect( diff --git a/pkg/drain/drainer.go b/pkg/drain/drainer.go index a3500dc47..4f35df5e4 100644 --- a/pkg/drain/drainer.go +++ b/pkg/drain/drainer.go @@ -31,6 +31,7 @@ func (w writer) Write(p []byte) (n int, err error) { type DrainInterface interface { DrainNode(context.Context, *corev1.Node, bool) (bool, error) CompleteDrainNode(context.Context, *corev1.Node) (bool, error) + RunCordonOrUncordon(ctx context.Context, node *corev1.Node, desired bool) error } type Drainer struct { @@ -98,7 +99,7 @@ func (d *Drainer) DrainNode(ctx context.Context, node *corev1.Node, fullNodeDrai reqLogger.Info("drainNode(): failed to drain node", "error", err) return false, err } - reqLogger.Info("drainNode(): drain complete") + reqLogger.Info("drainNode(): Drain completed") return true, nil } @@ -131,6 +132,22 @@ func (d *Drainer) CompleteDrainNode(ctx context.Context, node *corev1.Node) (boo return completed, nil } +// RunCordonOrUncordon runs cordon or uncordon on a specific +func (d *Drainer) RunCordonOrUncordon(ctx context.Context, node *corev1.Node, desired bool) error { + logger := log.FromContext(ctx) + logger.Info("RunCordonOrUncordon:()") + + // create drain helper we don't care about the drain function so we sent false to the fullDrain parameter + drainHelper := createDrainHelper(d.kubeClient, ctx, false) + + // perform the api call + err := drain.RunCordonOrUncordon(drainHelper, node, desired) + if err != nil { + logger.Error(err, "failed to cordon/uncordon node", "node", node.Name, "desired", desired) + } + return err +} + // createDrainHelper function to create a drain helper // if fullDrain is false we only remove pods that have the resourcePrefix // if not we remove all the pods in the node diff --git a/pkg/platforms/openshift/openshift.go b/pkg/platforms/openshift/openshift.go index 3f7d3421c..b55b9c70d 100644 --- a/pkg/platforms/openshift/openshift.go +++ b/pkg/platforms/openshift/openshift.go @@ -228,6 +228,18 @@ func (c *openshiftContext) OpenshiftAfterCompleteDrainNode(ctx context.Context, return false, err } + value, exist := mcp.Annotations[consts.MachineConfigPoolPausedAnnotation] + // if the label doesn't exist we just return true here + // this can be a case where the node was moved to another MCP in the time we start the drain + if !exist { + return true, nil + } + // check if the sriov annotation on mcp is idle + // if the value is idle we just return here + if value == consts.MachineConfigPoolPausedAnnotationIdle { + return true, nil + } + // get all the nodes that belong to this machine config pool to validate this is the last node // request to complete the drain nodesInPool := &corev1.NodeList{} diff --git a/pkg/utils/cluster.go b/pkg/utils/cluster.go index 6f8d72e07..4065ece35 100644 --- a/pkg/utils/cluster.go +++ b/pkg/utils/cluster.go @@ -131,7 +131,8 @@ func AnnotateObject(ctx context.Context, obj client.Object, key, value string, c log.Log.V(2).Info("AnnotateObject(): Annotate object", "objectName", obj.GetName(), "objectKind", obj.GetObjectKind(), - "annotation", value) + "annotationKey", key, + "annotationValue", value) newObj := obj.DeepCopyObject().(client.Object) if newObj.GetAnnotations() == nil { newObj.SetAnnotations(map[string]string{}) @@ -161,3 +162,40 @@ func AnnotateNode(ctx context.Context, nodeName string, key, value string, c cli return AnnotateObject(ctx, node, key, value, c) } + +// LabelObject adds label to a kubernetes object +func LabelObject(ctx context.Context, obj client.Object, key, value string, c client.Client) error { + log.Log.V(2).Info("LabelObject(): Annotate object", + "objectName", obj.GetName(), + "objectKind", obj.GetObjectKind(), + "labelKey", key, + "labelValue", value) + newObj := obj.DeepCopyObject().(client.Object) + if newObj.GetLabels() == nil { + newObj.SetLabels(map[string]string{}) + } + + if newObj.GetLabels()[key] != value { + newObj.GetLabels()[key] = value + patch := client.MergeFrom(obj) + err := c.Patch(ctx, + newObj, patch) + if err != nil { + log.Log.Error(err, "annotateObject(): Failed to patch object") + return err + } + } + + return nil +} + +// LabelNode add label to a node +func LabelNode(ctx context.Context, nodeName string, key, value string, c client.Client) error { + node := &corev1.Node{} + err := c.Get(context.TODO(), client.ObjectKey{Name: nodeName}, node) + if err != nil { + return err + } + + return LabelObject(ctx, node, key, value, c) +}