From 76c70f273fdcf447eec17ad336b3a1c2fd6499d5 Mon Sep 17 00:00:00 2001 From: Xuzheng Chang Date: Mon, 14 Aug 2023 14:38:05 +0800 Subject: [PATCH] Add featuregates for volcano capabilities Signed-off-by: Xuzheng Chang --- pkg/controllers/job/job_controller.go | 77 +++++++++++-------- pkg/controllers/podgroup/pg_controller.go | 17 ++-- pkg/controllers/queue/queue_controller.go | 36 +++++---- pkg/features/volcano_features.go | 57 ++++++++++++++ pkg/scheduler/cache/cache.go | 32 ++++---- .../volumebinding/volume_binding.go | 4 +- .../volumebinding/volume_binding_test.go | 8 +- 7 files changed, 158 insertions(+), 73 deletions(-) create mode 100644 pkg/features/volcano_features.go diff --git a/pkg/controllers/job/job_controller.go b/pkg/controllers/job/job_controller.go index 6d7a5b1171..45ebccc5f6 100644 --- a/pkg/controllers/job/job_controller.go +++ b/pkg/controllers/job/job_controller.go @@ -24,6 +24,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" kubeschedulinginformers "k8s.io/client-go/informers/scheduling/v1" @@ -48,10 +49,12 @@ import ( batchlister "volcano.sh/apis/pkg/client/listers/batch/v1alpha1" buslister "volcano.sh/apis/pkg/client/listers/bus/v1alpha1" schedulinglisters "volcano.sh/apis/pkg/client/listers/scheduling/v1beta1" + "volcano.sh/volcano/pkg/controllers/apis" jobcache "volcano.sh/volcano/pkg/controllers/cache" "volcano.sh/volcano/pkg/controllers/framework" "volcano.sh/volcano/pkg/controllers/job/state" + "volcano.sh/volcano/pkg/features" ) func init() { @@ -151,39 +154,43 @@ func (cc *jobcontroller) Initialize(opt *framework.ControllerOption) error { factory := informerfactory.NewSharedInformerFactory(cc.vcClient, 0) cc.vcInformerFactory = factory - cc.jobInformer = factory.Batch().V1alpha1().Jobs() - cc.jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: cc.addJob, - UpdateFunc: cc.updateJob, - DeleteFunc: cc.deleteJob, - }) - cc.jobLister = cc.jobInformer.Lister() - cc.jobSynced = cc.jobInformer.Informer().HasSynced - - cc.cmdInformer = factory.Bus().V1alpha1().Commands() - cc.cmdInformer.Informer().AddEventHandler( - cache.FilteringResourceEventHandler{ - FilterFunc: func(obj interface{}) bool { - switch v := obj.(type) { - case *busv1alpha1.Command: - if v.TargetObject != nil && - v.TargetObject.APIVersion == batchv1alpha1.SchemeGroupVersion.String() && - v.TargetObject.Kind == "Job" { - return true - } + if utilfeature.DefaultFeatureGate.Enabled(features.WorkLoadSupport) { + cc.jobInformer = factory.Batch().V1alpha1().Jobs() + cc.jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: cc.addJob, + UpdateFunc: cc.updateJob, + DeleteFunc: cc.deleteJob, + }) + cc.jobLister = cc.jobInformer.Lister() + cc.jobSynced = cc.jobInformer.Informer().HasSynced + } - return false - default: - return false - } - }, - Handler: cache.ResourceEventHandlerFuncs{ - AddFunc: cc.addCommand, + if utilfeature.DefaultFeatureGate.Enabled(features.QueueCommandSync) { + cc.cmdInformer = factory.Bus().V1alpha1().Commands() + cc.cmdInformer.Informer().AddEventHandler( + cache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + switch v := obj.(type) { + case *busv1alpha1.Command: + if v.TargetObject != nil && + v.TargetObject.APIVersion == batchv1alpha1.SchemeGroupVersion.String() && + v.TargetObject.Kind == "Job" { + return true + } + + return false + default: + return false + } + }, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: cc.addCommand, + }, }, - }, - ) - cc.cmdLister = cc.cmdInformer.Lister() - cc.cmdSynced = cc.cmdInformer.Informer().HasSynced + ) + cc.cmdLister = cc.cmdInformer.Lister() + cc.cmdSynced = cc.cmdInformer.Informer().HasSynced + } cc.podInformer = sharedInformers.Core().V1().Pods() cc.podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -210,9 +217,11 @@ func (cc *jobcontroller) Initialize(opt *framework.ControllerOption) error { cc.pgLister = cc.pgInformer.Lister() cc.pgSynced = cc.pgInformer.Informer().HasSynced - cc.pcInformer = sharedInformers.Scheduling().V1().PriorityClasses() - cc.pcLister = cc.pcInformer.Lister() - cc.pcSynced = cc.pcInformer.Informer().HasSynced + if utilfeature.DefaultFeatureGate.Enabled(features.PriorityClass) { + cc.pcInformer = sharedInformers.Scheduling().V1().PriorityClasses() + cc.pcLister = cc.pcInformer.Lister() + cc.pcSynced = cc.pcInformer.Informer().HasSynced + } cc.queueInformer = factory.Scheduling().V1beta1().Queues() cc.queueLister = cc.queueInformer.Lister() diff --git a/pkg/controllers/podgroup/pg_controller.go b/pkg/controllers/podgroup/pg_controller.go index bb0272e118..395bfecc25 100644 --- a/pkg/controllers/podgroup/pg_controller.go +++ b/pkg/controllers/podgroup/pg_controller.go @@ -18,6 +18,7 @@ package podgroup import ( "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" appinformers "k8s.io/client-go/informers/apps/v1" coreinformers "k8s.io/client-go/informers/core/v1" @@ -34,6 +35,7 @@ import ( schedulinginformer "volcano.sh/apis/pkg/client/informers/externalversions/scheduling/v1beta1" schedulinglister "volcano.sh/apis/pkg/client/listers/scheduling/v1beta1" "volcano.sh/volcano/pkg/controllers/framework" + "volcano.sh/volcano/pkg/features" commonutil "volcano.sh/volcano/pkg/util" ) @@ -103,13 +105,14 @@ func (pg *pgcontroller) Initialize(opt *framework.ControllerOption) error { pg.pgLister = pg.pgInformer.Lister() pg.pgSynced = pg.pgInformer.Informer().HasSynced - pg.rsInformer = pg.informerFactory.Apps().V1().ReplicaSets() - pg.rsSynced = pg.rsInformer.Informer().HasSynced - pg.rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: pg.addReplicaSet, - UpdateFunc: pg.updateReplicaSet, - }) - + if utilfeature.DefaultFeatureGate.Enabled(features.WorkLoadSupport) { + pg.rsInformer = pg.informerFactory.Apps().V1().ReplicaSets() + pg.rsSynced = pg.rsInformer.Informer().HasSynced + pg.rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: pg.addReplicaSet, + UpdateFunc: pg.updateReplicaSet, + }) + } return nil } diff --git a/pkg/controllers/queue/queue_controller.go b/pkg/controllers/queue/queue_controller.go index ffdc252df1..1f38bc78a2 100644 --- a/pkg/controllers/queue/queue_controller.go +++ b/pkg/controllers/queue/queue_controller.go @@ -27,6 +27,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/kubernetes" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/cache" @@ -46,6 +47,7 @@ import ( "volcano.sh/volcano/pkg/controllers/apis" "volcano.sh/volcano/pkg/controllers/framework" queuestate "volcano.sh/volcano/pkg/controllers/queue/state" + "volcano.sh/volcano/pkg/features" ) func init() { @@ -137,22 +139,24 @@ func (c *queuecontroller) Initialize(opt *framework.ControllerOption) error { DeleteFunc: c.deletePodGroup, }) - c.cmdInformer = factory.Bus().V1alpha1().Commands() - c.cmdInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ - FilterFunc: func(obj interface{}) bool { - switch v := obj.(type) { - case *busv1alpha1.Command: - return IsQueueReference(v.TargetObject) - default: - return false - } - }, - Handler: cache.ResourceEventHandlerFuncs{ - AddFunc: c.addCommand, - }, - }) - c.cmdLister = c.cmdInformer.Lister() - c.cmdSynced = c.cmdInformer.Informer().HasSynced + if utilfeature.DefaultFeatureGate.Enabled(features.QueueCommandSync) { + c.cmdInformer = factory.Bus().V1alpha1().Commands() + c.cmdInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + switch v := obj.(type) { + case *busv1alpha1.Command: + return IsQueueReference(v.TargetObject) + default: + return false + } + }, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: c.addCommand, + }, + }) + c.cmdLister = c.cmdInformer.Lister() + c.cmdSynced = c.cmdInformer.Informer().HasSynced + } queuestate.SyncQueue = c.syncQueue queuestate.OpenQueue = c.openQueue diff --git a/pkg/features/volcano_features.go b/pkg/features/volcano_features.go new file mode 100644 index 0000000000..57055f21ec --- /dev/null +++ b/pkg/features/volcano_features.go @@ -0,0 +1,57 @@ +/* + Copyright 2023 The Volcano Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package features + +import ( + "k8s.io/apimachinery/pkg/util/runtime" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/component-base/featuregate" +) + +const ( + // WorkLoadSupport can cache and operate **K8s native resource**, Deployment/Replicas/ReplicationController/StatefulSet resources currently. + WorkLoadSupport featuregate.Feature = "WorkLoadSupport" + + // VolcanoJobSupport can identify and schedule volcano job. + VolcanoJobSupport featuregate.Feature = "VolcanoJobSupport" + + // QueueCommandSync supports queue command sync. + QueueCommandSync featuregate.Feature = "QueueCommandSync" + + // PriorityClass to provide the capacity of preemption at pod group level. + PriorityClass featuregate.Feature = "PriorityClass" + + // CSIStorage tracking of available storage capacity that CSI drivers provide + CSIStorage featuregate.Feature = "CSIStorage" + + // ResourceTopology supports resources like cpu/memory topology aware. + ResourceTopology featuregate.Feature = "ResourceTopology" +) + +func init() { + runtime.Must(utilfeature.DefaultMutableFeatureGate.Add(defaultVolcanoFeatureGates)) +} + +var defaultVolcanoFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ + WorkLoadSupport: {Default: true, PreRelease: featuregate.Alpha}, + VolcanoJobSupport: {Default: true, PreRelease: featuregate.Alpha}, + QueueCommandSync: {Default: true, PreRelease: featuregate.Alpha}, + PriorityClass: {Default: true, PreRelease: featuregate.Alpha}, + // CSIStorage is explicitly set to false by default. + CSIStorage: {Default: false, PreRelease: featuregate.Alpha}, + ResourceTopology: {Default: true, PreRelease: featuregate.Alpha}, +} diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 1eb1558287..68c5f840f0 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -33,6 +33,7 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" infov1 "k8s.io/client-go/informers/core/v1" schedv1 "k8s.io/client-go/informers/scheduling/v1" @@ -60,6 +61,7 @@ import ( vcinformerv1 "volcano.sh/apis/pkg/client/informers/externalversions/scheduling/v1beta1" "volcano.sh/volcano/cmd/scheduler/app/options" + "volcano.sh/volcano/pkg/features" schedulingapi "volcano.sh/volcano/pkg/scheduler/api" volumescheduling "volcano.sh/volcano/pkg/scheduler/capabilities/volumebinding" "volcano.sh/volcano/pkg/scheduler/metrics" @@ -501,9 +503,11 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu // `SelectorSpread` and `PodTopologySpread` plugins uses the following four so far. informerFactory.Core().V1().Namespaces().Informer() informerFactory.Core().V1().Services().Informer() - informerFactory.Core().V1().ReplicationControllers().Informer() - informerFactory.Apps().V1().ReplicaSets().Informer() - informerFactory.Apps().V1().StatefulSets().Informer() + if utilfeature.DefaultFeatureGate.Enabled(features.WorkLoadSupport) { + informerFactory.Core().V1().ReplicationControllers().Informer() + informerFactory.Apps().V1().ReplicaSets().Informer() + informerFactory.Apps().V1().StatefulSets().Informer() + } // create informer for node information sc.nodeInformer = informerFactory.Core().V1().Nodes() @@ -561,11 +565,11 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu DeleteFunc: sc.DeleteCSINode, }, ) - sc.csiDriverInformer = informerFactory.Storage().V1().CSIDrivers() - sc.csiStorageCapacityInformer = informerFactory.Storage().V1beta1().CSIStorageCapacities() var capacityCheck *volumescheduling.CapacityCheck - if options.ServerOpts.EnableCSIStorage { + if options.ServerOpts.EnableCSIStorage && utilfeature.DefaultFeatureGate.Enabled(features.CSIStorage) { + sc.csiDriverInformer = informerFactory.Storage().V1().CSIDrivers() + sc.csiStorageCapacityInformer = informerFactory.Storage().V1beta1().CSIStorageCapacities() capacityCheck = &volumescheduling.CapacityCheck{ CSIDriverInformer: sc.csiDriverInformer, CSIStorageCapacityInformer: sc.csiStorageCapacityInformer, @@ -621,7 +625,7 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu }, }) - if options.ServerOpts.EnablePriorityClass { + if options.ServerOpts.EnablePriorityClass && utilfeature.DefaultFeatureGate.Enabled(features.PriorityClass) { sc.pcInformer = informerFactory.Scheduling().V1().PriorityClasses() sc.pcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: sc.AddPriorityClass, @@ -677,12 +681,14 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu DeleteFunc: sc.DeleteQueueV1beta1, }) - sc.cpuInformer = vcinformers.Nodeinfo().V1alpha1().Numatopologies() - sc.cpuInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: sc.AddNumaInfoV1alpha1, - UpdateFunc: sc.UpdateNumaInfoV1alpha1, - DeleteFunc: sc.DeleteNumaInfoV1alpha1, - }) + if utilfeature.DefaultFeatureGate.Enabled(features.ResourceTopology) { + sc.cpuInformer = vcinformers.Nodeinfo().V1alpha1().Numatopologies() + sc.cpuInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: sc.AddNumaInfoV1alpha1, + UpdateFunc: sc.UpdateNumaInfoV1alpha1, + DeleteFunc: sc.DeleteNumaInfoV1alpha1, + }) + } return sc } diff --git a/pkg/scheduler/capabilities/volumebinding/volume_binding.go b/pkg/scheduler/capabilities/volumebinding/volume_binding.go index 5edcb651c6..9fcfadc3d5 100644 --- a/pkg/scheduler/capabilities/volumebinding/volume_binding.go +++ b/pkg/scheduler/capabilities/volumebinding/volume_binding.go @@ -26,6 +26,7 @@ import ( v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" + utilfeature "k8s.io/apiserver/pkg/util/feature" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/component-helpers/storage/ephemeral" "k8s.io/klog/v2" @@ -37,6 +38,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" "volcano.sh/volcano/cmd/scheduler/app/options" + "volcano.sh/volcano/pkg/features" ) const ( @@ -377,7 +379,7 @@ func New(plArgs runtime.Object, fh framework.Handle, fts feature.Features) (fram storageClassInformer := fh.SharedInformerFactory().Storage().V1().StorageClasses() csiNodeInformer := fh.SharedInformerFactory().Storage().V1().CSINodes() var capacityCheck *CapacityCheck - if options.ServerOpts.EnableCSIStorage { + if options.ServerOpts.EnableCSIStorage && utilfeature.DefaultFeatureGate.Enabled(features.CSIStorage) { capacityCheck = &CapacityCheck{ CSIDriverInformer: fh.SharedInformerFactory().Storage().V1().CSIDrivers(), CSIStorageCapacityInformer: fh.SharedInformerFactory().Storage().V1beta1().CSIStorageCapacities(), diff --git a/pkg/scheduler/capabilities/volumebinding/volume_binding_test.go b/pkg/scheduler/capabilities/volumebinding/volume_binding_test.go index 8f32dc3476..3cfe2ed150 100644 --- a/pkg/scheduler/capabilities/volumebinding/volume_binding_test.go +++ b/pkg/scheduler/capabilities/volumebinding/volume_binding_test.go @@ -21,8 +21,6 @@ import ( "reflect" "testing" - "volcano.sh/volcano/cmd/scheduler/app/options" - "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" v1 "k8s.io/api/core/v1" @@ -30,12 +28,17 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" + featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/runtime" + + "volcano.sh/volcano/cmd/scheduler/app/options" + "volcano.sh/volcano/pkg/features" ) var ( @@ -596,6 +599,7 @@ func TestVolumeBinding(t *testing.T) { options.ServerOpts = &options.ServerOption{ EnableCSIStorage: true, } + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIStorage, true)() for _, item := range table { t.Run(item.name, func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background())