diff --git a/cmd/glbc/main.go b/cmd/glbc/main.go index 9b6e326a06..1d23dc0548 100644 --- a/cmd/glbc/main.go +++ b/cmd/glbc/main.go @@ -28,7 +28,6 @@ import ( "k8s.io/klog" crdclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" - "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" @@ -79,13 +78,7 @@ func main() { if err != nil { klog.Fatalf("Failed to create kubernetes client: %v", err) } - var dynamicClient dynamic.Interface - if flags.F.EnableCSM { - dynamicClient, err = dynamic.NewForConfig(kubeConfig) - if err != nil { - klog.Fatalf("Failed to create kubernetes dynamic client: %v", err) - } - } + // Due to scaling issues, leader election must be configured with a separate k8s client. leaderElectKubeClient, err := kubernetes.NewForConfig(restclient.AddUserAgent(kubeConfig, "leader-election")) if err != nil { @@ -139,9 +132,11 @@ func main() { HealthCheckPath: flags.F.HealthCheckPath, DefaultBackendHealthCheckPath: flags.F.DefaultSvcHealthCheckPath, FrontendConfigEnabled: flags.F.EnableFrontendConfig, - EnableCSM: flags.F.EnableCSM, + EnableASMConfigMap: flags.F.EnableASMConfigMapBasedConfig, + ASMConfigMapNamespace: flags.F.ASMConfigMapBasedConfigNamespace, + ASMConfigMapName: flags.F.ASMConfigMapBasedConfigCMName, } - ctx := ingctx.NewControllerContext(kubeClient, dynamicClient, backendConfigClient, frontendConfigClient, cloud, namer, ctxConfig) + ctx := ingctx.NewControllerContext(kubeConfig, kubeClient, backendConfigClient, frontendConfigClient, cloud, namer, ctxConfig) go app.RunHTTPServer(ctx.HealthCheck) if !flags.F.LeaderElection.LeaderElect { @@ -149,18 +144,17 @@ func main() { return } - electionConfig, err := makeLeaderElectionConfig(leaderElectKubeClient, ctx.Recorder(flags.F.LeaderElection.LockObjectNamespace), func() { - runControllers(ctx) - }) + electionConfig, err := makeLeaderElectionConfig(ctx, leaderElectKubeClient, ctx.Recorder(flags.F.LeaderElection.LockObjectNamespace)) if err != nil { klog.Fatalf("%v", err) } leaderelection.RunOrDie(context.Background(), *electionConfig) + klog.Warning("Ingress Controller exited.") } // makeLeaderElectionConfig builds a leader election configuration. It will // create a new resource lock associated with the configuration. -func makeLeaderElectionConfig(client clientset.Interface, recorder record.EventRecorder, run func()) (*leaderelection.LeaderElectionConfig, error) { +func makeLeaderElectionConfig(ctx *ingctx.ControllerContext, client clientset.Interface, recorder record.EventRecorder) (*leaderelection.LeaderElectionConfig, error) { hostname, err := os.Hostname() if err != nil { return nil, fmt.Errorf("unable to get hostname: %v", err) @@ -180,6 +174,12 @@ func makeLeaderElectionConfig(client clientset.Interface, recorder record.EventR return nil, fmt.Errorf("couldn't create resource lock: %v", err) } + run := func() { + runControllers(ctx) + klog.Info("Shutting down leader election") + os.Exit(0) + } + return &leaderelection.LeaderElectionConfig{ Lock: rl, LeaseDuration: flags.F.LeaderElection.LeaseDuration.Duration, @@ -192,7 +192,7 @@ func makeLeaderElectionConfig(client clientset.Interface, recorder record.EventR run() }, OnStoppedLeading: func() { - klog.Fatalf("lost master") + klog.Warning("lost master") }, }, }, nil @@ -200,12 +200,18 @@ func makeLeaderElectionConfig(client clientset.Interface, recorder record.EventR func runControllers(ctx *ingctx.ControllerContext) { stopCh := make(chan struct{}) + ctx.Init() lbc := controller.NewLoadBalancerController(ctx, stopCh) + if ctx.EnableASMConfigMap { + ctx.ASMConfigController.RegisterInformer(ctx.ConfigMapInformer, func() { + lbc.Stop(false) // We want to trigger a restart, don't have to clean up all the resources. + }) + } fwc := firewalls.NewFirewallController(ctx, flags.F.NodePortRanges.Values()) // TODO: Refactor NEG to use cloud mocks so ctx.Cloud can be referenced within NewController. - negController := neg.NewController(negtypes.NewAdapter(ctx.Cloud), ctx, lbc.Translator, ctx.ClusterNamer, flags.F.ResyncPeriod, flags.F.NegGCPeriod, flags.F.EnableReadinessReflector, flags.F.EnableCSM, flags.F.CSMServiceNEGSkipNamespaces) + negController := neg.NewController(negtypes.NewAdapter(ctx.Cloud), ctx, lbc.Translator, ctx.ClusterNamer, flags.F.ResyncPeriod, flags.F.NegGCPeriod, flags.F.EnableReadinessReflector) go negController.Run(stopCh) klog.V(0).Infof("negController started") @@ -220,7 +226,10 @@ func runControllers(ctx *ingctx.ControllerContext) { lbc.Run() for { - klog.Infof("Handled quit, awaiting pod deletion.") + klog.Warning("Handled quit, awaiting pod deletion.") time.Sleep(30 * time.Second) + if ctx.EnableASMConfigMap { + return + } } } diff --git a/docs/deploy/resources/configmap-based-config.yaml b/docs/deploy/resources/configmap-based-config.yaml new file mode 100644 index 0000000000..00badaa8d3 --- /dev/null +++ b/docs/deploy/resources/configmap-based-config.yaml @@ -0,0 +1,8 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: ingress-controller-asm-cm-config + namespace: kube-system +data: + enable-asm: "false" + asm-skip-namespaces: "kube-system,istio-system" diff --git a/pkg/cmconfig/config.go b/pkg/cmconfig/config.go new file mode 100644 index 0000000000..d401478a5b --- /dev/null +++ b/pkg/cmconfig/config.go @@ -0,0 +1,54 @@ +package cmconfig + +import ( + "fmt" + "reflect" + "strings" + + utilerrors "k8s.io/apimachinery/pkg/util/errors" +) + +// Config holds configmap based configurations. +type Config struct { + EnableASM bool + ASMServiceNEGSkipNamespaces []string +} + +const ( + trueValue = "true" + falseValue = "false" + + enableASM = "enable-asm" + asmSkipNamespaces = "asm-skip-namespaces" +) + +// NewConfig returns a Conifg instances with default values. +func NewConfig() Config { + return Config{ASMServiceNEGSkipNamespaces: []string{"kube-system"}} +} + +// Equals returns true if c equals to other. +func (c *Config) Equals(other *Config) bool { + return reflect.DeepEqual(c, other) +} + +// LoadValue loads configs from a map, it will ignore any unknow/unvalid field. +func (c *Config) LoadValue(m map[string]string) error { + var errList []error + for k, v := range m { + if k == enableASM { + if v == trueValue { + c.EnableASM = true + } else if v == falseValue { + c.EnableASM = false + } else { + errList = append(errList, fmt.Errorf("The map provided a unvalid value for field: %s, value: %s, valid values are: %s/%s", k, v, trueValue, falseValue)) + } + } else if k == asmSkipNamespaces { + c.ASMServiceNEGSkipNamespaces = strings.Split(v, ",") + } else { + errList = append(errList, fmt.Errorf("The map contains a unknown key-value pair: %s:%s", k, v)) + } + } + return utilerrors.NewAggregate(errList) +} diff --git a/pkg/cmconfig/config_test.go b/pkg/cmconfig/config_test.go new file mode 100644 index 0000000000..c183cef0e6 --- /dev/null +++ b/pkg/cmconfig/config_test.go @@ -0,0 +1,72 @@ +package cmconfig + +import ( + "reflect" + "strings" + "testing" +) + +func TestLoadValue(t *testing.T) { + testcases := []struct { + desc string + inputMap map[string]string + wantConfig Config + wantLog string + }{ + { + desc: "empty map should give default config", + inputMap: map[string]string{}, + wantConfig: NewConfig(), + wantLog: "", + }, + { + desc: "LoadValue should load values from a valid map", + inputMap: map[string]string{"enable-asm": "true", "asm-skip-namespaces": "name-space1,namespace2"}, + wantConfig: Config{EnableASM: true, ASMServiceNEGSkipNamespaces: []string{"name-space1", "namespace2"}}, + wantLog: "", + }, + { + desc: "LoadValue should return the default value if EnableASM has a unvalid value.", + inputMap: map[string]string{"enable-asm": "f"}, + wantConfig: Config{EnableASM: false, ASMServiceNEGSkipNamespaces: []string{"kube-system"}}, + wantLog: "The map provided a unvalid value for field: enable-asm, value: f, valid values are: true/false", + }, + { + desc: "LoadValue should be tolerant for unknow field.", + inputMap: map[string]string{"A": "B"}, + wantConfig: NewConfig(), + wantLog: "The map contains a unknown key-value pair: A:B", + }, + } + + for _, tc := range testcases { + t.Run(tc.desc, func(t *testing.T) { + config := NewConfig() + err := config.LoadValue(tc.inputMap) + if !config.Equals(&tc.wantConfig) { + t.Errorf("LoadValue loads wrong value, got: %v, want: %v", config, tc.wantConfig) + } + if tc.wantLog != "" { + if !strings.Contains(err.Error(), tc.wantLog) { + t.Errorf("LoadValue logs don't contain wanted log, got: %s, want: %s", err.Error(), tc.wantLog) + } + } + }) + } +} + +func TestConfigTag(t *testing.T) { + configType := reflect.TypeOf(Config{}) + for i := 0; i < configType.NumField(); i++ { + field := configType.Field(i) + fieldType := field.Type.Kind() + if fieldType != reflect.Bool && fieldType != reflect.String && fieldType != reflect.Slice { + t.Errorf("Struct config contains filed with unknown type: %s, only supports: %s/%s/[]string types", fieldType, reflect.Bool.String(), reflect.String.String()) + } + if fieldType == reflect.Slice { + if field.Type.Elem().Kind() != reflect.String { + t.Errorf("Struct config contains slice filed with unknown type: %s, only supports []string slice", field.Type.Elem().Kind()) + } + } + } +} diff --git a/pkg/cmconfig/controller.go b/pkg/cmconfig/controller.go new file mode 100644 index 0000000000..53ec9aa997 --- /dev/null +++ b/pkg/cmconfig/controller.go @@ -0,0 +1,117 @@ +package cmconfig + +import ( + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/klog" +) + +// ConfigMapConfigController is the ConfigMap based config controller. +// If cmConfigModeEnabled set to true, it will load the config from configmap: configMapNamespace/configMapName and restart ingress controller if the config has any ligeal changes. +// If cmConfigModeEnabled set to false, it will return the default values for the configs. +type ConfigMapConfigController struct { + configMapNamespace string + configMapName string + currentConfig *Config + currentConfigMapObject *v1.ConfigMap + kubeClient kubernetes.Interface + recorder record.EventRecorder +} + +// NewConfigMapConfigController creates a new ConfigMapConfigController, it will load the config from the target configmap +func NewConfigMapConfigController(kubeClient kubernetes.Interface, recorder record.EventRecorder, configMapNamespace, configMapName string) *ConfigMapConfigController { + + currentConfig := NewConfig() + cm, err := kubeClient.CoreV1().ConfigMaps(configMapNamespace).Get(configMapName, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + klog.Infof("ConfigMapConfigController: Not found the configmap based config, using default config: %v", currentConfig) + } else { + klog.Warningf("ConfigMapConfigController failed to load config from api server, using the defualt config. Error: %v", err) + } + } else { + if err := currentConfig.LoadValue(cm.Data); err != nil { + if recorder != nil { + recorder.Event(cm, "Warning", "LoadValueError", err.Error()) + } + klog.Warningf("LoadValue error: %s", err.Error()) + } + klog.Infof("ConfigMapConfigController: loaded config from configmap, config %v", currentConfig) + } + + c := &ConfigMapConfigController{ + configMapNamespace: configMapNamespace, + configMapName: configMapName, + currentConfig: ¤tConfig, + kubeClient: kubeClient, + recorder: recorder, + } + return c +} + +// GetConfig returns the internal Config +func (c *ConfigMapConfigController) GetConfig() Config { + return *c.currentConfig +} + +// RecordEvent records a event to the ASMConfigmap +func (c *ConfigMapConfigController) RecordEvent(eventtype, reason, message string) bool { + if c.recorder == nil || c.currentConfigMapObject == nil { + return false + } + c.recorder.Event(c.currentConfigMapObject, eventtype, reason, message) + return true +} + +// RegisterInformer regjister the configmap based config controller handler to the configapInformer which will watch the target +// configmap and send stop message to the stopCh if any valid change detected. +func (c *ConfigMapConfigController) RegisterInformer(configMapInformer cache.SharedIndexInformer, cancel func()) { + configMapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + c.processItem(obj, cancel) + }, + DeleteFunc: func(obj interface{}) { + c.processItem(obj, cancel) + }, + UpdateFunc: func(_, cur interface{}) { + c.processItem(cur, cancel) + }, + }) + +} + +func (c *ConfigMapConfigController) processItem(obj interface{}, cancel func()) { + configMap, ok := obj.(*v1.ConfigMap) + if !ok { + klog.Errorf("ConfigMapConfigController: failed to convert informer object to ConfigMap.") + } + if configMap.Namespace != c.configMapNamespace || configMap.Name != c.configMapName { + return + } + + config := NewConfig() + cm, err := c.kubeClient.CoreV1().ConfigMaps(c.configMapNamespace).Get(c.configMapName, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + klog.Infof("ConfigMapConfigController: Not found the configmap based config, using default config: %v", config) + } else { + klog.Warningf("ConfigMapConfigController failed to load config from api server, using the defualt config. Error: %v", err) + } + } else { + c.currentConfigMapObject = cm + if err := config.LoadValue(cm.Data); err != nil { + c.RecordEvent("Warning", "LoadValueError", err.Error()) + klog.Warningf("LoadValue error: %s", err.Error()) + } + } + + if !config.Equals(c.currentConfig) { + klog.Warningf("ConfigMapConfigController: Get a update on the ConfigMapConfig. Old config: %v, new config: %v. Restarting Ingress controller...", *c.currentConfig, config) + c.RecordEvent("Normal", "ASMConfigMapTiggerRestart", "ConfigMapConfigController: Get a update on the ConfigMapConfig, Restarting Ingress controller") + cancel() + } +} diff --git a/pkg/cmconfig/controller_test.go b/pkg/cmconfig/controller_test.go new file mode 100644 index 0000000000..befd2e9792 --- /dev/null +++ b/pkg/cmconfig/controller_test.go @@ -0,0 +1,148 @@ +package cmconfig + +import ( + "strings" + "testing" + "time" + + "bytes" + "os" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + informerv1 "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/ingress-gce/pkg/utils" + "k8s.io/klog" +) + +const ( + testNamespace = "kube-system" + testConfigMapName = "test-configmap" +) + +func TestNewConfigMapConfigControllerDefaultValue(t *testing.T) { + kubeClient := fake.NewSimpleClientset() + var logBuf bytes.Buffer + klog.SetOutput(&logBuf) + defer func() { + klog.SetOutput(os.Stderr) + }() + cmcController := NewConfigMapConfigController(kubeClient, nil, testNamespace, testConfigMapName) + + newConfig := NewConfig() + config := cmcController.GetConfig() + if !config.Equals(&newConfig) { + t.Errorf("GetConfig should return the same config as NewConfig, got: %v, want: %v", cmcController.GetConfig(), NewConfig()) + } +} + +func TestController(t *testing.T) { + defaultConfig := NewConfig() + testcases := []struct { + desc string + defaultConfigMapData map[string]string + updateConifgMapData map[string]string + wantConfig *Config + wantUpdateConfig *Config + wantStop bool + wantLog string + donotWantLog string + }{ + { + desc: "No configMap config exists, controller should return default config", + defaultConfigMapData: nil, + updateConifgMapData: nil, + wantConfig: &defaultConfig, + wantUpdateConfig: nil, + wantStop: false, + wantLog: "Not found the configmap based config", + donotWantLog: "", + }, + { + desc: "Update a default value shouldn't trigger restart", + defaultConfigMapData: nil, + updateConifgMapData: map[string]string{"enable-asm": "false"}, + wantConfig: &defaultConfig, + wantUpdateConfig: &defaultConfig, + wantStop: false, + wantLog: "Not found the configmap based config", + donotWantLog: "", + }, + { + desc: "update the default config should trigger a restart", + defaultConfigMapData: map[string]string{"enable-asm": "false"}, + updateConifgMapData: map[string]string{"enable-asm": "true"}, + wantConfig: &defaultConfig, + wantUpdateConfig: &Config{EnableASM: true, ASMServiceNEGSkipNamespaces: []string{"kube-system"}}, + wantStop: true, + wantLog: "", + donotWantLog: "Not found the configmap based config", + }, + { + desc: "invalide config should give the default config", + defaultConfigMapData: map[string]string{"enable-asm": "TTTTT"}, + updateConifgMapData: nil, + wantConfig: &defaultConfig, + wantUpdateConfig: nil, + wantStop: false, + wantLog: "unvalid value", + donotWantLog: "", + }, + } + + for _, tc := range testcases { + t.Run(tc.desc, func(t *testing.T) { + var logBuf bytes.Buffer + stopped := false + klog.SetOutput(&logBuf) + defer func() { + klog.SetOutput(os.Stderr) + }() + + fakeClient := fake.NewSimpleClientset() + cmInformer := informerv1.NewConfigMapInformer(fakeClient, "", 30*time.Second, utils.NewNamespaceIndexer()) + cmLister := cmInformer.GetIndexer() + + if tc.defaultConfigMapData != nil { + fakeClient.CoreV1().ConfigMaps(testNamespace).Create(&v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Namespace: testNamespace, Name: testConfigMapName}, + Data: tc.defaultConfigMapData}) + } + controller := NewConfigMapConfigController(fakeClient, nil, testNamespace, testConfigMapName) + config := controller.GetConfig() + if !config.Equals(tc.wantConfig) { + t.Errorf("Default Config not equals to wantConfig, got: %v, want: %v", config, tc.wantConfig) + } + controller.RegisterInformer(cmInformer, func() { + stopped = true + }) + + if tc.updateConifgMapData != nil { + updateConfigMap := v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Namespace: testNamespace, Name: testConfigMapName}, + Data: tc.updateConifgMapData} + + cmLister.Add(&updateConfigMap) + fakeClient.CoreV1().ConfigMaps(testNamespace).Update(&updateConfigMap) + controller.processItem(&updateConfigMap, func() { + stopped = true + }) + + } + if tc.wantStop && !stopped { + t.Errorf("Controller should trigger the restart. stopped should be set to true, bug got: %v", stopped) + } + + if tc.wantLog != "" && !strings.Contains(logBuf.String(), tc.wantLog) { + t.Errorf("Missing log, got: %v, want: %v", logBuf.String(), tc.wantLog) + } + + if tc.donotWantLog != "" && strings.Contains(logBuf.String(), tc.donotWantLog) { + t.Errorf("Having not wanted log, got: %v, not want: %v", logBuf.String(), tc.donotWantLog) + } + }) + + } + +} diff --git a/pkg/context/context.go b/pkg/context/context.go index dbb07462d5..0e5802d866 100644 --- a/pkg/context/context.go +++ b/pkg/context/context.go @@ -14,6 +14,7 @@ limitations under the License. package context import ( + "fmt" "sync" "time" @@ -26,10 +27,12 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" backendconfigclient "k8s.io/ingress-gce/pkg/backendconfig/client/clientset/versioned" informerbackendconfig "k8s.io/ingress-gce/pkg/backendconfig/client/informers/externalversions/backendconfig/v1beta1" + "k8s.io/ingress-gce/pkg/cmconfig" "k8s.io/ingress-gce/pkg/common/typed" frontendconfigclient "k8s.io/ingress-gce/pkg/frontendconfig/client/clientset/versioned" informerfrontendconfig "k8s.io/ingress-gce/pkg/frontendconfig/client/informers/externalversions/frontendconfig/v1beta1" @@ -46,6 +49,7 @@ const ( // ControllerContext holds the state needed for the execution of the controller. type ControllerContext struct { + KubeConfig *rest.Config KubeClient kubernetes.Interface DestinationRuleClient dynamic.NamespaceableResourceInterface @@ -54,6 +58,7 @@ type ControllerContext struct { ClusterNamer *namer.Namer ControllerContextConfig + ASMConfigController *cmconfig.ConfigMapConfigController IngressInformer cache.SharedIndexInformer ServiceInformer cache.SharedIndexInformer @@ -63,6 +68,7 @@ type ControllerContext struct { NodeInformer cache.SharedIndexInformer EndpointInformer cache.SharedIndexInformer DestinationRuleInformer cache.SharedIndexInformer + ConfigMapInformer cache.SharedIndexInformer healthChecks map[string]func() error @@ -81,13 +87,15 @@ type ControllerContextConfig struct { HealthCheckPath string DefaultBackendHealthCheckPath string FrontendConfigEnabled bool - EnableCSM bool + EnableASMConfigMap bool + ASMConfigMapNamespace string + ASMConfigMapName string } // NewControllerContext returns a new shared set of informers. func NewControllerContext( + kubeConfig *rest.Config, kubeClient kubernetes.Interface, - dynamicClient dynamic.Interface, backendConfigClient backendconfigclient.Interface, frontendConfigClient frontendconfigclient.Interface, cloud *gce.Cloud, @@ -95,6 +103,7 @@ func NewControllerContext( config ControllerContextConfig) *ControllerContext { context := &ControllerContext{ + KubeConfig: kubeConfig, KubeClient: kubeClient, Cloud: cloud, ClusterNamer: namer, @@ -109,16 +118,6 @@ func NewControllerContext( healthChecks: make(map[string]func() error), } - if config.EnableCSM && dynamicClient != nil { - klog.Warning("The DestinationRule group version is v1alpha3 in group networking.istio.io. Need to update as istio API graduates.") - destrinationGVR := schema.GroupVersionResource{Group: "networking.istio.io", Version: "v1alpha3", Resource: "destinationrules"} - drDynamicInformer := dynamicinformer.NewFilteredDynamicInformer(dynamicClient, destrinationGVR, config.Namespace, config.ResyncPeriod, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - nil) - context.DestinationRuleInformer = drDynamicInformer.Informer() - context.DestinationRuleClient = dynamicClient.Resource(destrinationGVR) - } - if config.FrontendConfigEnabled { context.FrontendConfigInformer = informerfrontendconfig.NewFrontendConfigInformer(frontendConfigClient, config.Namespace, config.ResyncPeriod, utils.NewNamespaceIndexer()) } @@ -126,6 +125,35 @@ func NewControllerContext( return context } +// Init inits the Context, so that we can defers some config until the main thread enter actually get the leader lock. +func (ctx *ControllerContext) Init() { + // Initialize controller context internals based on ASMConfigMap + if ctx.EnableASMConfigMap { + configMapInformer := informerv1.NewConfigMapInformer(ctx.KubeClient, ctx.Namespace, ctx.ResyncPeriod, utils.NewNamespaceIndexer()) + ctx.ConfigMapInformer = configMapInformer + ctx.ASMConfigController = cmconfig.NewConfigMapConfigController(ctx.KubeClient, ctx.Recorder(ctx.ASMConfigMapNamespace), ctx.ASMConfigMapNamespace, ctx.ASMConfigMapName) + + cmConfig := ctx.ASMConfigController.GetConfig() + if cmConfig.EnableASM { + dynamicClient, err := dynamic.NewForConfig(ctx.KubeConfig) + if err != nil { + msg := fmt.Sprintf("Failed to create kubernetes dynamic client: %v", err) + klog.Fatalf(msg) + ctx.ASMConfigController.RecordEvent("Warning", "FailedCreateDynamicClient", msg) + } + + klog.Warning("The DestinationRule group version is v1alpha3 in group networking.istio.io. Need to update as istio API graduates.") + destrinationGVR := schema.GroupVersionResource{Group: "networking.istio.io", Version: "v1alpha3", Resource: "destinationrules"} + drDynamicInformer := dynamicinformer.NewFilteredDynamicInformer(dynamicClient, destrinationGVR, ctx.Namespace, ctx.ResyncPeriod, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + nil) + ctx.DestinationRuleInformer = drDynamicInformer.Informer() + ctx.DestinationRuleClient = dynamicClient.Resource(destrinationGVR) + } + } + +} + // HasSynced returns true if all relevant informers has been synced. func (ctx *ControllerContext) HasSynced() bool { funcs := []func() bool{ @@ -145,6 +173,10 @@ func (ctx *ControllerContext) HasSynced() bool { funcs = append(funcs, ctx.DestinationRuleInformer.HasSynced) } + if ctx.ConfigMapInformer != nil { + funcs = append(funcs, ctx.ConfigMapInformer.HasSynced) + } + for _, f := range funcs { if !f() { return false @@ -212,6 +244,9 @@ func (ctx *ControllerContext) Start(stopCh chan struct{}) { if ctx.DestinationRuleInformer != nil { go ctx.DestinationRuleInformer.Run(stopCh) } + if ctx.EnableASMConfigMap && ctx.ConfigMapInformer != nil { + go ctx.ConfigMapInformer.Run(stopCh) + } } // Ingresses returns the store of Ingresses. diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 881bf85b73..e8ec510ed6 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -82,7 +82,8 @@ func newLoadBalancerController() *LoadBalancerController { HealthCheckPath: "/", DefaultBackendHealthCheckPath: "/healthz", } - ctx := context.NewControllerContext(kubeClient, nil, backendConfigClient, nil, fakeGCE, namer, ctxConfig) + + ctx := context.NewControllerContext(nil, kubeClient, backendConfigClient, nil, fakeGCE, namer, ctxConfig) lbc := NewLoadBalancerController(ctx, stopCh) // TODO(rramkumar): Fix this so we don't have to override with our fake lbc.instancePool = instances.NewNodePool(instances.NewFakeInstanceGroups(sets.NewString(), namer), namer) diff --git a/pkg/controller/translator/translator_test.go b/pkg/controller/translator/translator_test.go index 5cc3cb2688..c523e30e21 100644 --- a/pkg/controller/translator/translator_test.go +++ b/pkg/controller/translator/translator_test.go @@ -65,7 +65,7 @@ func fakeTranslator() *Translator { HealthCheckPath: "/", DefaultBackendHealthCheckPath: "/healthz", } - ctx := context.NewControllerContext(client, nil, backendConfigClient, nil, nil, defaultNamer, ctxConfig) + ctx := context.NewControllerContext(nil, client, backendConfigClient, nil, nil, defaultNamer, ctxConfig) gce := &Translator{ ctx: ctx, } diff --git a/pkg/firewalls/controller_test.go b/pkg/firewalls/controller_test.go index 88b4656350..6e7c46acfa 100644 --- a/pkg/firewalls/controller_test.go +++ b/pkg/firewalls/controller_test.go @@ -46,7 +46,7 @@ func newFirewallController() *FirewallController { DefaultBackendSvcPort: test.DefaultBeSvcPort, } - ctx := context.NewControllerContext(kubeClient, nil, backendConfigClient, nil, fakeGCE, defaultNamer, ctxConfig) + ctx := context.NewControllerContext(nil, kubeClient, backendConfigClient, nil, fakeGCE, defaultNamer, ctxConfig) fwc := NewFirewallController(ctx, []string{"30000-32767"}) fwc.hasSynced = func() bool { return true } diff --git a/pkg/flags/flags.go b/pkg/flags/flags.go index 23e9d76852..4587ff46ec 100644 --- a/pkg/flags/flags.go +++ b/pkg/flags/flags.go @@ -60,33 +60,34 @@ const ( var ( // F are global flags for the controller. F = struct { - APIServerHost string - ClusterName string - ConfigFilePath string - DefaultSvcHealthCheckPath string - DefaultSvc string - DefaultSvcPortName string - DeleteAllOnQuit bool - EnableFrontendConfig bool - GCERateLimit RateLimitSpecs - GCEOperationPollInterval time.Duration - HealthCheckPath string - HealthzPort int - InCluster bool - IngressClass string - KubeConfigFile string - ResyncPeriod time.Duration - Version bool - WatchNamespace string - NodePortRanges PortRanges - NegGCPeriod time.Duration - EnableReadinessReflector bool - FinalizerAdd bool - FinalizerRemove bool - EnableL7Ilb bool - EnableCSM bool - CSMServiceNEGSkipNamespaces []string - EnableNonGCPMode bool + APIServerHost string + ClusterName string + ConfigFilePath string + DefaultSvcHealthCheckPath string + DefaultSvc string + DefaultSvcPortName string + DeleteAllOnQuit bool + EnableFrontendConfig bool + GCERateLimit RateLimitSpecs + GCEOperationPollInterval time.Duration + HealthCheckPath string + HealthzPort int + InCluster bool + IngressClass string + KubeConfigFile string + ResyncPeriod time.Duration + Version bool + WatchNamespace string + NodePortRanges PortRanges + NegGCPeriod time.Duration + EnableReadinessReflector bool + FinalizerAdd bool + FinalizerRemove bool + EnableL7Ilb bool + EnableASMConfigMapBasedConfig bool + ASMConfigMapBasedConfigNamespace string + ASMConfigMapBasedConfigCMName string + EnableNonGCPMode bool LeaderElection LeaderElectionConfiguration }{} @@ -201,8 +202,9 @@ L7 load balancing. CSV values accepted. Example: -node-port-ranges=80,8080,400-5 F.FinalizerRemove, "Enable removing Finalizer from Ingress.") flag.BoolVar(&F.EnableL7Ilb, "enable-l7-ilb", false, `Optional, whether or not to enable L7-ILB.`) - flag.BoolVar(&F.EnableCSM, "enable-csm", false, "Enable CSM(Istio) support") - flag.StringSliceVar(&F.CSMServiceNEGSkipNamespaces, "csm-service-skip-namespaces", []string{}, "Only for CSM mode, skip the NEG creation for Services in the given namespaces.") + flag.BoolVar(&F.EnableASMConfigMapBasedConfig, "enable-asm-config-map-config", false, "Enable ASMConfigMapBasedConfig") + flag.StringVar(&F.ASMConfigMapBasedConfigNamespace, "asm-configmap-based-config-namespace", "kube-system,istio-system", "ASM Configmap based config: configmap namespace") + flag.StringVar(&F.ASMConfigMapBasedConfigCMName, "asm-configmap-based-config-cmname", "ingress-controller-asm-cm-config", "ASM Configmap based config: configmap name") flag.BoolVar(&F.EnableNonGCPMode, "enable-non-gcp-mode", false, "Set to true when running on a non-GCP cluster.") } diff --git a/pkg/neg/controller.go b/pkg/neg/controller.go index 5b202e27c8..0736148739 100644 --- a/pkg/neg/controller.go +++ b/pkg/neg/controller.go @@ -70,8 +70,8 @@ type Controller struct { defaultBackendService utils.ServicePort destinationRuleLister cache.Indexer destinationRuleClient dynamic.NamespaceableResourceInterface - enableCSM bool - csmServiceNEGSkipNamespaces []string + enableASM bool + asmServiceNEGSkipNamespaces []string // serviceQueue takes service key as work item. Service key with format "namespace/name". serviceQueue workqueue.RateLimitingInterface @@ -97,8 +97,6 @@ func NewController( resyncPeriod time.Duration, gcPeriod time.Duration, enableReadinessReflector bool, - enableCSM bool, - csmServiceNEGSkipNamespaces []string, ) *Controller { // init event recorder // TODO: move event recorder initializer to main. Reuse it among controllers. @@ -120,23 +118,21 @@ func NewController( manager.reflector = reflector negController := &Controller{ - client: ctx.KubeClient, - manager: manager, - resyncPeriod: resyncPeriod, - gcPeriod: gcPeriod, - recorder: recorder, - zoneGetter: zoneGetter, - namer: namer, - defaultBackendService: ctx.DefaultBackendSvcPort, - hasSynced: ctx.HasSynced, - ingressLister: ctx.IngressInformer.GetIndexer(), - serviceLister: ctx.ServiceInformer.GetIndexer(), - serviceQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), - endpointQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), - syncTracker: utils.NewTimeTracker(), - reflector: reflector, - enableCSM: enableCSM, - csmServiceNEGSkipNamespaces: csmServiceNEGSkipNamespaces, + client: ctx.KubeClient, + manager: manager, + resyncPeriod: resyncPeriod, + gcPeriod: gcPeriod, + recorder: recorder, + zoneGetter: zoneGetter, + namer: namer, + defaultBackendService: ctx.DefaultBackendSvcPort, + hasSynced: ctx.HasSynced, + ingressLister: ctx.IngressInformer.GetIndexer(), + serviceLister: ctx.ServiceInformer.GetIndexer(), + serviceQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + endpointQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + syncTracker: utils.NewTimeTracker(), + reflector: reflector, } ctx.IngressInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -198,16 +194,21 @@ func NewController( }, }) - if enableCSM { - negController.destinationRuleLister = ctx.DestinationRuleInformer.GetIndexer() - ctx.DestinationRuleInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: negController.enqueueDestinationRule, - DeleteFunc: negController.enqueueDestinationRule, - UpdateFunc: func(old, cur interface{}) { - negController.enqueueDestinationRule(cur) - }, - }) - negController.destinationRuleClient = ctx.DestinationRuleClient + if ctx.EnableASMConfigMap { + cmconfig := ctx.ASMConfigController.GetConfig() + if cmconfig.EnableASM { + negController.enableASM = cmconfig.EnableASM + negController.asmServiceNEGSkipNamespaces = cmconfig.ASMServiceNEGSkipNamespaces + negController.destinationRuleLister = ctx.DestinationRuleInformer.GetIndexer() + ctx.DestinationRuleInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: negController.enqueueDestinationRule, + DeleteFunc: negController.enqueueDestinationRule, + UpdateFunc: func(old, cur interface{}) { + negController.enqueueDestinationRule(cur) + }, + }) + negController.destinationRuleClient = ctx.DestinationRuleClient + } } ctx.AddHealthCheck("neg-controller", negController.IsHealthy) @@ -458,7 +459,7 @@ func (c *Controller) mergeDefaultBackendServicePortInfoMap(key string, portInfoM func (c *Controller) getCSMPortInfoMap(namespace, name string, service *apiv1.Service) (negtypes.PortInfoMap, negtypes.PortInfoMap, error) { destinationRulesPortInfoMap := make(negtypes.PortInfoMap) servicePortInfoMap := make(negtypes.PortInfoMap) - if c.enableCSM { + if c.enableASM { // Find all destination rules that using this service. destinationRules := getDestinationRulesFromStore(c.destinationRuleLister, service) // Fill all service ports into portinfomap @@ -478,7 +479,7 @@ func (c *Controller) getCSMPortInfoMap(namespace, name string, service *apiv1.Se // Create NEGs for every ports of the services. if service.Spec.Selector == nil || len(service.Spec.Selector) == 0 { klog.Infof("Skip NEG creation for services that with no selector: %s:%s", namespace, name) - } else if contains(c.csmServiceNEGSkipNamespaces, namespace) { + } else if contains(c.asmServiceNEGSkipNamespaces, namespace) { klog.Infof("Skip NEG creation for services in namespace: %s", namespace) } else { servicePortInfoMap = negtypes.NewPortInfoMap(namespace, name, servicePorts, c.namer, false) diff --git a/pkg/neg/controller_test.go b/pkg/neg/controller_test.go index 0253a0f9d8..f90cef717e 100644 --- a/pkg/neg/controller_test.go +++ b/pkg/neg/controller_test.go @@ -25,6 +25,11 @@ import ( "testing" "time" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic/dynamicinformer" + informerv1 "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/tools/cache" + istioV1alpha3 "istio.io/api/networking/v1alpha3" apiv1 "k8s.io/api/core/v1" "k8s.io/api/networking/v1beta1" @@ -39,6 +44,7 @@ import ( "k8s.io/client-go/kubernetes/fake" "k8s.io/ingress-gce/pkg/annotations" backendconfigclient "k8s.io/ingress-gce/pkg/backendconfig/client/clientset/versioned/fake" + "k8s.io/ingress-gce/pkg/cmconfig" "k8s.io/ingress-gce/pkg/context" "k8s.io/ingress-gce/pkg/flags" negtypes "k8s.io/ingress-gce/pkg/neg/types" @@ -73,14 +79,31 @@ func newTestController(kubeClient kubernetes.Interface) *Controller { dynamicSchema := runtime.NewScheme() //dynamicSchema.AddKnownTypeWithName(schema.GroupVersionKind{Group: "networking.istio.io", Version: "v1alpha3", Kind: "List"}, &unstructured.UnstructuredList{}) - dynamicClient := dynamicfake.NewSimpleDynamicClient(dynamicSchema) + kubeClient.CoreV1().ConfigMaps("kube-system").Create(&apiv1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Namespace: "kube-system", Name: "ingress-controller-config-test"}, Data: map[string]string{"enable-asm": "true"}}) + ctxConfig := context.ControllerContextConfig{ Namespace: apiv1.NamespaceAll, ResyncPeriod: 1 * time.Second, DefaultBackendSvcPort: defaultBackend, - EnableCSM: true, + EnableASMConfigMap: true, + ASMConfigMapNamespace: "kube-system", + ASMConfigMapName: "ingress-controller-config-test", } - context := context.NewControllerContext(kubeClient, dynamicClient, backendConfigClient, nil, gce.NewFakeGCECloud(gce.DefaultTestClusterValues()), namer, ctxConfig) + context := context.NewControllerContext(nil, kubeClient, backendConfigClient, nil, gce.NewFakeGCECloud(gce.DefaultTestClusterValues()), namer, ctxConfig) + + // Hack the context.Init func. + configMapInformer := informerv1.NewConfigMapInformer(kubeClient, context.Namespace, context.ResyncPeriod, utils.NewNamespaceIndexer()) + context.ConfigMapInformer = configMapInformer + context.ASMConfigController = cmconfig.NewConfigMapConfigController(kubeClient, nil, context.ASMConfigMapNamespace, context.ASMConfigMapName) + dynamicClient := dynamicfake.NewSimpleDynamicClient(dynamicSchema) + + destrinationGVR := schema.GroupVersionResource{Group: "networking.istio.io", Version: "v1alpha3", Resource: "destinationrules"} + drDynamicInformer := dynamicinformer.NewFilteredDynamicInformer(dynamicClient, destrinationGVR, context.Namespace, context.ResyncPeriod, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + nil) + context.DestinationRuleInformer = drDynamicInformer.Informer() + context.DestinationRuleClient = dynamicClient.Resource(destrinationGVR) + controller := NewController( negtypes.NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network"), context, @@ -90,8 +113,6 @@ func newTestController(kubeClient kubernetes.Interface) *Controller { 1*time.Second, // TODO(freehan): enable readiness reflector for unit tests false, - true, - nil, ) return controller } diff --git a/pkg/neg/manager.go b/pkg/neg/manager.go index 621a6a904b..eaab2df7db 100644 --- a/pkg/neg/manager.go +++ b/pkg/neg/manager.go @@ -18,10 +18,11 @@ package neg import ( "fmt" - "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" + "reflect" "sync" - "k8s.io/api/core/v1" + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" @@ -32,7 +33,6 @@ import ( negsyncer "k8s.io/ingress-gce/pkg/neg/syncers" negtypes "k8s.io/ingress-gce/pkg/neg/types" "k8s.io/klog" - "reflect" ) type serviceKey struct { diff --git a/pkg/neg/manager_test.go b/pkg/neg/manager_test.go index 846b8414a5..33f62bd485 100644 --- a/pkg/neg/manager_test.go +++ b/pkg/neg/manager_test.go @@ -74,7 +74,7 @@ func NewTestSyncerManager(kubeClient kubernetes.Interface) *syncerManager { ResyncPeriod: 1 * time.Second, DefaultBackendSvcPort: defaultBackend, } - context := context.NewControllerContext(kubeClient, nil, backendConfigClient, nil, gce.NewFakeGCECloud(gce.DefaultTestClusterValues()), namer, ctxConfig) + context := context.NewControllerContext(nil, kubeClient, backendConfigClient, nil, gce.NewFakeGCECloud(gce.DefaultTestClusterValues()), namer, ctxConfig) manager := newSyncerManager( namer, diff --git a/pkg/neg/readiness/reflector_test.go b/pkg/neg/readiness/reflector_test.go index b2f5ee75c3..a913e8aa83 100644 --- a/pkg/neg/readiness/reflector_test.go +++ b/pkg/neg/readiness/reflector_test.go @@ -62,7 +62,7 @@ func fakeContext() *context.ControllerContext { } fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues()) negtypes.MockNetworkEndpointAPIs(fakeGCE) - context := context.NewControllerContext(kubeClient, nil, nil, nil, fakeGCE, namer, ctxConfig) + context := context.NewControllerContext(nil, kubeClient, nil, nil, fakeGCE, namer, ctxConfig) return context } diff --git a/pkg/neg/syncers/syncer_test.go b/pkg/neg/syncers/syncer_test.go index 5e8dd5e05e..5ce521fe3e 100644 --- a/pkg/neg/syncers/syncer_test.go +++ b/pkg/neg/syncers/syncer_test.go @@ -88,7 +88,7 @@ func newSyncerTester() *syncerTester { ResyncPeriod: 1 * time.Second, DefaultBackendSvcPort: defaultBackend, } - context := context.NewControllerContext(kubeClient, nil, backendConfigClient, nil, nil, namer, ctxConfig) + context := context.NewControllerContext(nil, kubeClient, backendConfigClient, nil, nil, namer, ctxConfig) negSyncerKey := negtypes.NegSyncerKey{ Namespace: testServiceNamespace, Name: testServiceName, diff --git a/pkg/neg/syncers/transaction_test.go b/pkg/neg/syncers/transaction_test.go index c3d9b3ca8c..c18e4f4098 100644 --- a/pkg/neg/syncers/transaction_test.go +++ b/pkg/neg/syncers/transaction_test.go @@ -824,7 +824,7 @@ func newTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud) (negty ResyncPeriod: 1 * time.Second, DefaultBackendSvcPort: defaultBackend, } - context := context.NewControllerContext(kubeClient, nil, backendConfigClient, nil, nil, namer, ctxConfig) + context := context.NewControllerContext(nil, kubeClient, backendConfigClient, nil, nil, namer, ctxConfig) svcPort := negtypes.NegSyncerKey{ Namespace: testNamespace, Name: testService, diff --git a/pkg/neg/syncers/utils.go b/pkg/neg/syncers/utils.go index 547c76b81d..43f4e91200 100644 --- a/pkg/neg/syncers/utils.go +++ b/pkg/neg/syncers/utils.go @@ -18,7 +18,6 @@ package syncers import ( "fmt" - "k8s.io/ingress-gce/pkg/composite" "strconv" "strings" "time" @@ -31,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/ingress-gce/pkg/composite" negtypes "k8s.io/ingress-gce/pkg/neg/types" "k8s.io/ingress-gce/pkg/utils" "k8s.io/klog"