From 7c341b5e8327c0d3420c3baf75a5e03254fadbad Mon Sep 17 00:00:00 2001 From: Rohit Ramkumar Date: Mon, 6 Aug 2018 09:18:39 -0700 Subject: [PATCH] Refactor to remove ClusterManager completely --- cmd/glbc/main.go | 50 +++---- pkg/context/context.go | 12 +- pkg/controller/cluster_manager.go | 145 ------------------- pkg/controller/controller.go | 138 ++++++++++++++---- pkg/controller/controller_test.go | 37 +++-- pkg/controller/fakes.go | 71 --------- pkg/controller/node.go | 13 +- pkg/controller/translator/translator.go | 7 +- pkg/controller/translator/translator_test.go | 17 ++- pkg/controller/utils_test.go | 23 ++- pkg/firewalls/controller.go | 5 +- pkg/firewalls/controller_test.go | 4 +- pkg/neg/controller_test.go | 5 +- pkg/neg/manager_test.go | 7 +- pkg/neg/syncer_test.go | 4 +- 15 files changed, 206 insertions(+), 332 deletions(-) delete mode 100644 pkg/controller/cluster_manager.go delete mode 100644 pkg/controller/fakes.go diff --git a/cmd/glbc/main.go b/cmd/glbc/main.go index 7dae5222af..6c616514a2 100644 --- a/cmd/glbc/main.go +++ b/cmd/glbc/main.go @@ -100,17 +100,27 @@ func main() { } } + namer, err := app.NewNamer(kubeClient, flags.F.ClusterName, firewalls.DefaultFirewallName) + if err != nil { + glog.Fatalf("app.NewNamer(ctx.KubeClient, %q, %q) = %v", flags.F.ClusterName, firewalls.DefaultFirewallName, err) + } + if namer.UID() != "" { + glog.V(0).Infof("Cluster name: %+v", namer.UID()) + } + cloud := app.NewGCEClient() enableNEG := flags.F.Features.NEG defaultBackendServicePortID := app.DefaultBackendServicePortID(kubeClient) ctxConfig := context.ControllerContextConfig{ - NEGEnabled: enableNEG, - BackendConfigEnabled: flags.F.EnableBackendConfig, - Namespace: flags.F.WatchNamespace, - ResyncPeriod: flags.F.ResyncPeriod, - DefaultBackendSvcPortID: defaultBackendServicePortID, - } - ctx := context.NewControllerContext(kubeClient, backendConfigClient, cloud, ctxConfig) + NEGEnabled: enableNEG, + BackendConfigEnabled: flags.F.EnableBackendConfig, + Namespace: flags.F.WatchNamespace, + ResyncPeriod: flags.F.ResyncPeriod, + DefaultBackendSvcPortID: defaultBackendServicePortID, + HealthCheckPath: flags.F.HealthCheckPath, + DefaultBackendHealthCheckPath: flags.F.DefaultSvcHealthCheckPath, + } + ctx := context.NewControllerContext(kubeClient, backendConfigClient, cloud, namer, ctxConfig) go app.RunHTTPServer(ctx.HealthCheck) if !flags.F.LeaderElection.LeaderElect { @@ -167,33 +177,14 @@ func makeLeaderElectionConfig(client clientset.Interface, recorder record.EventR } func runControllers(ctx *context.ControllerContext) { - namer, err := app.NewNamer(ctx.KubeClient, flags.F.ClusterName, firewalls.DefaultFirewallName) - if err != nil { - glog.Fatalf("app.NewNamer(ctx.KubeClient, %q, %q) = %v", flags.F.ClusterName, firewalls.DefaultFirewallName, err) - } - - clusterManager, err := controller.NewClusterManager(ctx, namer, flags.F.HealthCheckPath, flags.F.DefaultSvcHealthCheckPath) - if err != nil { - glog.Fatalf("controller.NewClusterManager(cloud, namer, %q, %q) = %v", flags.F.HealthCheckPath, flags.F.DefaultSvcHealthCheckPath, err) - } - stopCh := make(chan struct{}) - lbc := controller.NewLoadBalancerController(ctx, clusterManager, stopCh) - if err != nil { - glog.Fatalf("controller.NewLoadBalancerController(ctx, clusterManager, stopCh) = %v", err) - } + lbc := controller.NewLoadBalancerController(ctx, stopCh) - fwc := firewalls.NewFirewallController(ctx, namer, flags.F.NodePortRanges.Values()) - - if clusterManager.ClusterNamer.UID() != "" { - glog.V(0).Infof("Cluster name: %+v", clusterManager.ClusterNamer.UID()) - } - clusterManager.Init(lbc.Translator, lbc.Translator) - glog.V(0).Infof("clusterManager initialized") + fwc := firewalls.NewFirewallController(ctx, flags.F.NodePortRanges.Values()) if ctx.NEGEnabled { // TODO: Refactor NEG to use cloud mocks so ctx.Cloud can be referenced within NewController. - negController := neg.NewController(ctx.Cloud, ctx, lbc.Translator, namer, flags.F.ResyncPeriod) + negController := neg.NewController(ctx.Cloud, ctx, lbc.Translator, ctx.ClusterNamer, flags.F.ResyncPeriod) go negController.Run(stopCh) glog.V(0).Infof("negController started") } @@ -204,6 +195,7 @@ func runControllers(ctx *context.ControllerContext) { glog.V(0).Infof("firewall controller started") ctx.Start(stopCh) + lbc.Init() lbc.Run() for { diff --git a/pkg/context/context.go b/pkg/context/context.go index 88228f79df..e5393c85ee 100644 --- a/pkg/context/context.go +++ b/pkg/context/context.go @@ -53,6 +53,8 @@ type ControllerContext struct { Cloud *gce.GCECloud + ClusterNamer *utils.Namer + ControllerContextConfig IngressInformer cache.SharedIndexInformer @@ -77,7 +79,9 @@ type ControllerContextConfig struct { Namespace string ResyncPeriod time.Duration // DefaultBackendSvcPortID is the ServicePortID for the system default backend. - DefaultBackendSvcPortID utils.ServicePortID + DefaultBackendSvcPortID utils.ServicePortID + HealthCheckPath string + DefaultBackendHealthCheckPath string } // NewControllerContext returns a new shared set of informers. @@ -85,11 +89,13 @@ func NewControllerContext( kubeClient kubernetes.Interface, backendConfigClient backendconfigclient.Interface, cloud *gce.GCECloud, + namer *utils.Namer, config ControllerContextConfig) *ControllerContext { context := &ControllerContext{ - KubeClient: kubeClient, - Cloud: cloud, + KubeClient: kubeClient, + Cloud: cloud, + ClusterNamer: namer, ControllerContextConfig: config, IngressInformer: informerv1beta1.NewIngressInformer(kubeClient, config.Namespace, config.ResyncPeriod, NewIndexer()), ServiceInformer: informerv1.NewServiceInformer(kubeClient, config.Namespace, config.ResyncPeriod, NewIndexer()), diff --git a/pkg/controller/cluster_manager.go b/pkg/controller/cluster_manager.go deleted file mode 100644 index 59b1176317..0000000000 --- a/pkg/controller/cluster_manager.go +++ /dev/null @@ -1,145 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package controller - -import ( - "github.com/golang/glog" - - compute "google.golang.org/api/compute/v1" - - "k8s.io/ingress-gce/pkg/backends" - "k8s.io/ingress-gce/pkg/context" - "k8s.io/ingress-gce/pkg/healthchecks" - "k8s.io/ingress-gce/pkg/instances" - "k8s.io/ingress-gce/pkg/loadbalancers" - "k8s.io/ingress-gce/pkg/utils" -) - -// ClusterManager manages cluster resource pools. -type ClusterManager struct { - ClusterNamer *utils.Namer - instancePool instances.NodePool - backendPool backends.BackendPool - l7Pool loadbalancers.LoadBalancerPool - - // TODO: Refactor so we simply init a health check pool. - healthChecker healthchecks.HealthChecker -} - -// Init initializes the cluster manager. -func (c *ClusterManager) Init(zl instances.ZoneLister, pp backends.ProbeProvider) { - c.instancePool.Init(zl) - c.backendPool.Init(pp) - // TODO: Initialize other members as needed. -} - -func (c *ClusterManager) shutdown() error { - if err := c.l7Pool.Shutdown(); err != nil { - return err - } - // The backend pool will also delete instance groups. - return c.backendPool.Shutdown() -} - -func (c *ClusterManager) EnsureInstanceGroupsAndPorts(nodeNames []string, servicePorts []utils.ServicePort) ([]*compute.InstanceGroup, error) { - // Convert to slice of NodePort int64s. - ports := []int64{} - for _, p := range uniq(servicePorts) { - if !p.NEGEnabled { - ports = append(ports, p.NodePort) - } - } - - // Create instance groups and set named ports. - igs, err := c.instancePool.EnsureInstanceGroupsAndPorts(c.ClusterNamer.InstanceGroup(), ports) - if err != nil { - return nil, err - } - - // Add/remove instances to the instance groups. - if err = c.instancePool.Sync(nodeNames); err != nil { - return nil, err - } - - return igs, err -} - -// GC garbage collects unused resources. -// - lbNames are the names of L7 loadbalancers we wish to exist. Those not in -// this list are removed from the cloud. -// - nodePorts are the ports for which we want BackendServies. BackendServices -// for ports not in this list are deleted. -// This method ignores googleapi 404 errors (StatusNotFound). -func (c *ClusterManager) GC(lbNames []string, nodePorts []utils.ServicePort) error { - // On GC: - // * Loadbalancers need to get deleted before backends. - // * Backends are refcounted in a shared pool. - // * We always want to GC backends even if there was an error in GCing - // loadbalancers, because the next Sync could rely on the GC for quota. - // * There are at least 2 cases for backend GC: - // 1. The loadbalancer has been deleted. - // 2. An update to the url map drops the refcount of a backend. This can - // happen when an Ingress is updated, if we don't GC after the update - // we'll leak the backend. - lbErr := c.l7Pool.GC(lbNames) - beErr := c.backendPool.GC(nodePorts) - if lbErr != nil { - return lbErr - } - if beErr != nil { - return beErr - } - - // TODO(ingress#120): Move this to the backend pool so it mirrors creation - if len(lbNames) == 0 { - igName := c.ClusterNamer.InstanceGroup() - glog.Infof("Deleting instance group %v", igName) - if err := c.instancePool.DeleteInstanceGroup(igName); err != err { - return err - } - glog.V(2).Infof("Shutting down firewall as there are no loadbalancers") - } - - return nil -} - -// NewClusterManager creates a cluster manager for shared resources. -// - namer: is the namer used to tag cluster wide shared resources. -// - defaultBackendNodePort: is the node port of glbc's default backend. This is -// the kubernetes Service that serves the 404 page if no urls match. -// - healthCheckPath: is the default path used for L7 health checks, eg: "/healthz". -// - defaultBackendHealthCheckPath: is the default path used for the default backend health checks. -func NewClusterManager( - ctx *context.ControllerContext, - namer *utils.Namer, - healthCheckPath string, - defaultBackendHealthCheckPath string) (*ClusterManager, error) { - - // Names are fundamental to the cluster, the uid allocator makes sure names don't collide. - cluster := ClusterManager{ClusterNamer: namer} - - // NodePool stores GCE vms that are in this Kubernetes cluster. - cluster.instancePool = instances.NewNodePool(ctx.Cloud, namer) - - // BackendPool creates GCE BackendServices and associated health checks. - cluster.healthChecker = healthchecks.NewHealthChecker(ctx.Cloud, healthCheckPath, defaultBackendHealthCheckPath, cluster.ClusterNamer, ctx.DefaultBackendSvcPortID.Service) - cluster.backendPool = backends.NewBackendPool(ctx.Cloud, ctx.Cloud, cluster.healthChecker, cluster.instancePool, cluster.ClusterNamer, ctx.BackendConfigEnabled, true) - - // L7 pool creates targetHTTPProxy, ForwardingRules, UrlMaps, StaticIPs. - cluster.l7Pool = loadbalancers.NewLoadBalancerPool(ctx.Cloud, cluster.ClusterNamer) - return &cluster, nil -} diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 627474829f..997d2e4833 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -24,6 +24,7 @@ import ( "time" "github.com/golang/glog" + compute "google.golang.org/api/compute/v1" apiv1 "k8s.io/api/core/v1" extensions "k8s.io/api/extensions/v1beta1" @@ -34,6 +35,9 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" backendconfigv1beta1 "k8s.io/ingress-gce/pkg/apis/backendconfig/v1beta1" + "k8s.io/ingress-gce/pkg/backends" + "k8s.io/ingress-gce/pkg/healthchecks" + "k8s.io/ingress-gce/pkg/instances" "k8s.io/ingress-gce/pkg/annotations" "k8s.io/ingress-gce/pkg/context" @@ -54,10 +58,9 @@ type LoadBalancerController struct { nodes *NodeController // TODO: Watch secrets - CloudClusterManager *ClusterManager - ingQueue utils.TaskQueue - Translator *translator.Translator - stopCh chan struct{} + ingQueue utils.TaskQueue + Translator *translator.Translator + stopCh chan struct{} // stopLock is used to enforce only a single call to Stop is active. // Needed because we allow stopping through an http endpoint and // allowing concurrent stoppers leads to stack traces. @@ -68,15 +71,16 @@ type LoadBalancerController struct { // hasSynced returns true if all associated sub-controllers have synced. // Abstracted into a func for testing. hasSynced func() bool + + // Resource pools. + instancePool instances.NodePool + backendPool backends.BackendPool + l7Pool loadbalancers.LoadBalancerPool } // NewLoadBalancerController creates a controller for gce loadbalancers. -// - clusterManager: A ClusterManager capable of creating all cloud resources -// required for L7 loadbalancing. -// - resyncPeriod: Watchers relist from the Kubernetes API server this often. func NewLoadBalancerController( ctx *context.ControllerContext, - clusterManager *ClusterManager, stopCh chan struct{}) *LoadBalancerController { broadcaster := record.NewBroadcaster() @@ -84,16 +88,24 @@ func NewLoadBalancerController( broadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{ Interface: ctx.KubeClient.Core().Events(""), }) + healthChecker := healthchecks.NewHealthChecker(ctx.Cloud, ctx.HealthCheckPath, ctx.DefaultBackendHealthCheckPath, ctx.ClusterNamer, ctx.DefaultBackendSvcPortID.Service) + instancePool := instances.NewNodePool(ctx.Cloud, ctx.ClusterNamer) + backendPool := backends.NewBackendPool(ctx.Cloud, ctx.Cloud, healthChecker, instancePool, ctx.ClusterNamer, ctx.BackendConfigEnabled, true) lbc := LoadBalancerController{ - client: ctx.KubeClient, - ctx: ctx, - ingLister: utils.StoreToIngressLister{Store: ctx.IngressInformer.GetStore()}, - nodeLister: ctx.NodeInformer.GetIndexer(), - nodes: NewNodeController(ctx, clusterManager), - CloudClusterManager: clusterManager, - stopCh: stopCh, - hasSynced: ctx.HasSynced, + client: ctx.KubeClient, + ctx: ctx, + ingLister: utils.StoreToIngressLister{Store: ctx.IngressInformer.GetStore()}, + nodeLister: ctx.NodeInformer.GetIndexer(), + Translator: translator.NewTranslator(ctx), + tlsLoader: &tls.TLSCertsFromSecretsLoader{Client: ctx.KubeClient}, + stopCh: stopCh, + hasSynced: ctx.HasSynced, + nodes: NewNodeController(ctx, instancePool), + instancePool: instancePool, + backendPool: backendPool, + l7Pool: loadbalancers.NewLoadBalancerPool(ctx.Cloud, ctx.ClusterNamer), } + lbc.ingQueue = utils.NewPeriodicTaskQueue("ingresses", lbc.sync) // Ingress event handlers. @@ -175,9 +187,6 @@ func NewLoadBalancerController( }) } - lbc.Translator = translator.NewTranslator(lbc.CloudClusterManager.ClusterNamer, lbc.ctx) - lbc.tlsLoader = &tls.TLSCertsFromSecretsLoader{Client: lbc.client} - // Register health check on controller context. ctx.AddHealthCheck("ingress", lbc.IsHealthy) @@ -186,6 +195,12 @@ func NewLoadBalancerController( return &lbc } +func (lbc *LoadBalancerController) Init() { + // TODO(rramkumar): Try to get rid of this "Init". + lbc.instancePool.Init(lbc.Translator) + lbc.backendPool.Init(lbc.Translator) +} + // Run starts the loadbalancer controller. func (lbc *LoadBalancerController) Run() { glog.Infof("Starting loadbalancer controller") @@ -213,9 +228,14 @@ func (lbc *LoadBalancerController) Stop(deleteAll bool) error { } // Deleting shared cluster resources is idempotent. + // TODO(rramkumar): Do we need deleteAll? Can we get rid of its' flag? if deleteAll { glog.Infof("Shutting down cluster manager.") - return lbc.CloudClusterManager.shutdown() + if err := lbc.l7Pool.Shutdown(); err != nil { + return err + } + // The backend pool will also delete instance groups. + return lbc.backendPool.Shutdown() } return nil } @@ -224,7 +244,7 @@ func (lbc *LoadBalancerController) Stop(deleteAll bool) error { func (lbc *LoadBalancerController) IsHealthy() (err error) { // TODO: Expand on this, for now we just want to detect when the GCE client // is broken. - _, err = lbc.CloudClusterManager.backendPool.List() + _, err = lbc.backendPool.List() // If this container is scheduled on a node without compute/rw it is // effectively useless, but it is healthy. Reporting it as unhealthy @@ -263,7 +283,7 @@ func (lbc *LoadBalancerController) sync(key string) (retErr error) { if !ingExists { glog.V(2).Infof("Ingress %q no longer exists, triggering GC", key) // GC will find GCE resources that were used for this ingress and delete them. - return lbc.CloudClusterManager.GC(lbNames, gceSvcPorts) + return lbc.gc(lbNames, gceSvcPorts) } // Get ingress and DeepCopy for assurance that we don't pollute other goroutines with changes. @@ -281,7 +301,7 @@ func (lbc *LoadBalancerController) sync(key string) (retErr error) { // Garbage collection will occur regardless of an error occurring. If an error occurred, // it could have been caused by quota issues; therefore, garbage collecting now may // free up enough quota for the next sync to pass. - if gcErr := lbc.CloudClusterManager.GC(lbNames, gceSvcPorts); gcErr != nil { + if gcErr := lbc.gc(lbNames, gceSvcPorts); gcErr != nil { retErr = fmt.Errorf("error during sync %v, error during GC %v", retErr, gcErr) } @@ -295,7 +315,7 @@ func (lbc *LoadBalancerController) ensureIngress(ing *extensions.Ingress, nodeNa } ingSvcPorts := urlMap.AllServicePorts() - igs, err := lbc.CloudClusterManager.EnsureInstanceGroupsAndPorts(nodeNames, ingSvcPorts) + igs, err := lbc.ensureInstanceGroupsAndPorts(ingSvcPorts, nodeNames) if err != nil { return err } @@ -321,12 +341,12 @@ func (lbc *LoadBalancerController) ensureIngress(ing *extensions.Ingress, nodeNa } // Create the backends. Note that we only need the IG links. - if err := lbc.CloudClusterManager.backendPool.Ensure(uniq(ingSvcPorts), utils.IGLinks(igs)); err != nil { + if err := lbc.backendPool.Ensure(uniq(ingSvcPorts), utils.IGLinks(igs)); err != nil { return err } // Create higher-level LB resources. - if err := lbc.CloudClusterManager.l7Pool.Sync(lb); err != nil { + if err := lbc.l7Pool.Sync(lb); err != nil { return err } @@ -338,7 +358,7 @@ func (lbc *LoadBalancerController) ensureIngress(ing *extensions.Ingress, nodeNa if err != nil { return err } - if err := lbc.CloudClusterManager.backendPool.Link(svcPort, zones); err != nil { + if err := lbc.backendPool.Link(svcPort, zones); err != nil { return err } } @@ -346,7 +366,7 @@ func (lbc *LoadBalancerController) ensureIngress(ing *extensions.Ingress, nodeNa } // Get the loadbalancer and update the ingress status. - l7, err := lbc.CloudClusterManager.l7Pool.Get(lb.Name) + l7, err := lbc.l7Pool.Get(lb.Name) if err != nil { return fmt.Errorf("unable to get loadbalancer: %v", err) } @@ -387,7 +407,7 @@ func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing lbc.ctx.Recorder(ing.Namespace).Eventf(currIng, apiv1.EventTypeNormal, "CREATE", "ip: %v", ip) } } - annotations, err := loadbalancers.GetLBAnnotations(l7, currIng.Annotations, lbc.CloudClusterManager.backendPool) + annotations, err := loadbalancers.GetLBAnnotations(l7, currIng.Annotations, lbc.backendPool) if err != nil { return err } @@ -453,3 +473,63 @@ func (lbc *LoadBalancerController) ToSvcPorts(ings *extensions.IngressList) []ut } return knownPorts } + +func (lbc *LoadBalancerController) ensureInstanceGroupsAndPorts(svcPorts []utils.ServicePort, nodeNames []string) ([]*compute.InstanceGroup, error) { + ports := []int64{} + for _, p := range uniq(svcPorts) { + if !p.NEGEnabled { + ports = append(ports, p.NodePort) + } + } + + // Create instance groups and set named ports. + igs, err := lbc.instancePool.EnsureInstanceGroupsAndPorts(lbc.ctx.ClusterNamer.InstanceGroup(), ports) + if err != nil { + return nil, err + } + // Add/remove instances to the instance groups. + if err = lbc.instancePool.Sync(nodeNames); err != nil { + return nil, err + } + + return igs, nil +} + +// gc garbage collects unused resources. +// - lbNames are the names of L7 loadbalancers we wish to exist. Those not in +// this list are removed from the cloud. +// - nodePorts are the ports for which we want BackendServies. BackendServices +// for ports not in this list are deleted. +// This method ignores googleapi 404 errors (StatusNotFound). +func (lbc *LoadBalancerController) gc(lbNames []string, nodePorts []utils.ServicePort) error { + // On GC: + // * Loadbalancers need to get deleted before backends. + // * Backends are refcounted in a shared pool. + // * We always want to GC backends even if there was an error in GCing + // loadbalancers, because the next Sync could rely on the GC for quota. + // * There are at least 2 cases for backend GC: + // 1. The loadbalancer has been deleted. + // 2. An update to the url map drops the refcount of a backend. This can + // happen when an Ingress is updated, if we don't GC after the update + // we'll leak the backend. + lbErr := lbc.l7Pool.GC(lbNames) + beErr := lbc.backendPool.GC(nodePorts) + if lbErr != nil { + return lbErr + } + if beErr != nil { + return beErr + } + + // TODO(ingress#120): Move this to the backend pool so it mirrors creation + if len(lbNames) == 0 { + igName := lbc.ctx.ClusterNamer.InstanceGroup() + glog.Infof("Deleting instance group %v", igName) + if err := lbc.instancePool.DeleteInstanceGroup(igName); err != err { + return err + } + glog.V(2).Infof("Shutting down firewall as there are no loadbalancers") + } + + return nil +} diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index fad23349cd..7317e38db0 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -26,10 +26,14 @@ import ( meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes/fake" backendconfigclient "k8s.io/ingress-gce/pkg/backendconfig/client/clientset/versioned/fake" + "k8s.io/ingress-gce/pkg/instances" + "k8s.io/ingress-gce/pkg/loadbalancers" "k8s.io/ingress-gce/pkg/test" "k8s.io/ingress-gce/pkg/utils" + "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" "k8s.io/ingress-gce/pkg/context" ) @@ -40,20 +44,28 @@ var ( ) // newLoadBalancerController create a loadbalancer controller. -func newLoadBalancerController(t *testing.T, cm *fakeClusterManager) *LoadBalancerController { +func newLoadBalancerController() *LoadBalancerController { kubeClient := fake.NewSimpleClientset() backendConfigClient := backendconfigclient.NewSimpleClientset() + fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues()) + namer := utils.NewNamer(clusterUID, "") stopCh := make(chan struct{}) ctxConfig := context.ControllerContextConfig{ - NEGEnabled: true, - BackendConfigEnabled: false, - Namespace: api_v1.NamespaceAll, - ResyncPeriod: 1 * time.Minute, - DefaultBackendSvcPortID: test.DefaultBeSvcPort.ID, + NEGEnabled: true, + BackendConfigEnabled: false, + Namespace: api_v1.NamespaceAll, + ResyncPeriod: 1 * time.Minute, + DefaultBackendSvcPortID: test.DefaultBeSvcPort.ID, + HealthCheckPath: "/", + DefaultBackendHealthCheckPath: "/healthz", } - ctx := context.NewControllerContext(kubeClient, backendConfigClient, cm.fakeBackends, ctxConfig) - lbc := NewLoadBalancerController(ctx, cm.ClusterManager, stopCh) + ctx := context.NewControllerContext(kubeClient, backendConfigClient, fakeGCE, namer, ctxConfig) + lbc := NewLoadBalancerController(ctx, stopCh) + // TODO(rramkumar): Fix this so we don't have to override with our fake + lbc.instancePool = instances.NewNodePool(instances.NewFakeInstanceGroups(sets.NewString(), namer), namer) + lbc.l7Pool = loadbalancers.NewLoadBalancerPool(loadbalancers.NewFakeLoadBalancers(clusterUID, namer), namer) + lbc.instancePool.Init(&instances.FakeZoneLister{Zones: []string{"zone-a"}}) lbc.hasSynced = func() bool { return true } @@ -115,8 +127,7 @@ func backend(name string, port intstr.IntOrString) extensions.IngressBackend { // TestIngressSyncError asserts that `sync` will bubble an error when an ingress cannot be synced // due to configuration problems. func TestIngressSyncError(t *testing.T) { - cm := NewFakeClusterManager(clusterUID, "") - lbc := newLoadBalancerController(t, cm) + lbc := newLoadBalancerController() someBackend := backend("my-service", intstr.FromInt(80)) ing := test.NewIngress(types.NamespacedName{Name: "my-ingress", Namespace: "default"}, @@ -139,8 +150,7 @@ func TestIngressSyncError(t *testing.T) { // TestIngressCreateDelete asserts that `sync` will not return an error for a good ingress config // and will not return an error when the ingress is deleted. func TestIngressCreateDelete(t *testing.T) { - cm := NewFakeClusterManager(clusterUID, "") - lbc := newLoadBalancerController(t, cm) + lbc := newLoadBalancerController() svc := test.NewService(types.NamespacedName{Name: "my-service", Namespace: "default"}, api_v1.ServiceSpec{ Type: api_v1.ServiceTypeNodePort, @@ -174,8 +184,7 @@ func TestIngressCreateDelete(t *testing.T) { // TestEnsureMCIngress asserts a multi-cluster ingress will result with correct status annotations. func TestEnsureMCIngress(t *testing.T) { - cm := NewFakeClusterManager(clusterUID, "") - lbc := newLoadBalancerController(t, cm) + lbc := newLoadBalancerController() svc := test.NewService(types.NamespacedName{Name: "my-service", Namespace: "default"}, api_v1.ServiceSpec{ Type: api_v1.ServiceTypeNodePort, diff --git a/pkg/controller/fakes.go b/pkg/controller/fakes.go deleted file mode 100644 index afdf8a1816..0000000000 --- a/pkg/controller/fakes.go +++ /dev/null @@ -1,71 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package controller - -import ( - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" - - "k8s.io/ingress-gce/pkg/backends" - "k8s.io/ingress-gce/pkg/healthchecks" - "k8s.io/ingress-gce/pkg/instances" - "k8s.io/ingress-gce/pkg/loadbalancers" - "k8s.io/ingress-gce/pkg/neg" - "k8s.io/ingress-gce/pkg/test" - "k8s.io/ingress-gce/pkg/utils" -) - -var ( - testSrcRanges = []string{"1.1.1.1/20"} -) - -// ClusterManager fake -type fakeClusterManager struct { - *ClusterManager - fakeLbs *loadbalancers.FakeLoadBalancers - fakeBackends *gce.GCECloud - fakeIGs *instances.FakeInstanceGroups - Namer *utils.Namer -} - -// NewFakeClusterManager creates a new fake ClusterManager. -func NewFakeClusterManager(clusterName, firewallName string) *fakeClusterManager { - namer := utils.NewNamer(clusterName, firewallName) - fakeLbs := loadbalancers.NewFakeLoadBalancers(clusterName, namer) - fakeBackends := gce.FakeGCECloud(gce.DefaultTestClusterValues()) - fakeIGs := instances.NewFakeInstanceGroups(sets.NewString(), namer) - fakeHCP := healthchecks.NewFakeHealthCheckProvider() - fakeNEG := neg.NewFakeNetworkEndpointGroupCloud("test-subnet", "test-network") - - nodePool := instances.NewNodePool(fakeIGs, namer) - nodePool.Init(&instances.FakeZoneLister{Zones: []string{"zone-a"}}) - - healthChecker := healthchecks.NewHealthChecker(fakeHCP, "/", "/healthz", namer, test.DefaultBeSvcPort.ID.Service) - - backendPool := backends.NewBackendPool( - fakeBackends, - fakeNEG, - healthChecker, nodePool, namer, false, false) - l7Pool := loadbalancers.NewLoadBalancerPool(fakeLbs, namer) - cm := &ClusterManager{ - ClusterNamer: namer, - instancePool: nodePool, - backendPool: backendPool, - l7Pool: l7Pool, - } - return &fakeClusterManager{cm, fakeLbs, fakeBackends, fakeIGs, namer} -} diff --git a/pkg/controller/node.go b/pkg/controller/node.go index d5014b32ca..366169a17d 100644 --- a/pkg/controller/node.go +++ b/pkg/controller/node.go @@ -23,6 +23,7 @@ import ( listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/ingress-gce/pkg/context" + "k8s.io/ingress-gce/pkg/instances" "k8s.io/ingress-gce/pkg/utils" ) @@ -37,15 +38,15 @@ type NodeController struct { lister cache.Indexer // queue is the TaskQueue used to manage the node worker updates. queue utils.TaskQueue - // cm is the shared ClusterManager interface. - cm *ClusterManager + // instancePool is a NodePool to manage kubernetes nodes. + instancePool instances.NodePool } // NewNodeController returns a new node update controller. -func NewNodeController(ctx *context.ControllerContext, cm *ClusterManager) *NodeController { +func NewNodeController(ctx *context.ControllerContext, instancePool instances.NodePool) *NodeController { c := &NodeController{ - lister: ctx.NodeInformer.GetIndexer(), - cm: cm, + lister: ctx.NodeInformer.GetIndexer(), + instancePool: instancePool, } c.queue = utils.NewPeriodicTaskQueue("nodes", c.sync) @@ -80,5 +81,5 @@ func (c *NodeController) sync(key string) error { if err != nil { return err } - return c.cm.instancePool.Sync(nodeNames) + return c.instancePool.Sync(nodeNames) } diff --git a/pkg/controller/translator/translator.go b/pkg/controller/translator/translator.go index 8f70e3e3a3..254b6f21d9 100644 --- a/pkg/controller/translator/translator.go +++ b/pkg/controller/translator/translator.go @@ -40,14 +40,13 @@ import ( ) // NewTranslator returns a new Translator. -func NewTranslator(namer *utils.Namer, ctx *context.ControllerContext) *Translator { - return &Translator{namer, ctx} +func NewTranslator(ctx *context.ControllerContext) *Translator { + return &Translator{ctx} } // Translator helps with kubernetes -> gce api conversion. type Translator struct { - namer *utils.Namer - ctx *context.ControllerContext + ctx *context.ControllerContext } // getServicePort looks in the svc store for a matching service:port, diff --git a/pkg/controller/translator/translator_test.go b/pkg/controller/translator/translator_test.go index afa32e9f7a..91b4b2e932 100644 --- a/pkg/controller/translator/translator_test.go +++ b/pkg/controller/translator/translator_test.go @@ -50,16 +50,17 @@ func fakeTranslator(negEnabled, backendConfigEnabled bool) *Translator { namer := utils.NewNamer("uid1", "") ctxConfig := context.ControllerContextConfig{ - NEGEnabled: negEnabled, - BackendConfigEnabled: backendConfigEnabled, - Namespace: apiv1.NamespaceAll, - ResyncPeriod: 1 * time.Second, - DefaultBackendSvcPortID: defaultBackend, + NEGEnabled: negEnabled, + BackendConfigEnabled: backendConfigEnabled, + Namespace: apiv1.NamespaceAll, + ResyncPeriod: 1 * time.Second, + DefaultBackendSvcPortID: defaultBackend, + HealthCheckPath: "/", + DefaultBackendHealthCheckPath: "/healthz", } - ctx := context.NewControllerContext(client, backendConfigClient, nil, ctxConfig) + ctx := context.NewControllerContext(client, backendConfigClient, nil, namer, ctxConfig) gce := &Translator{ - namer: namer, - ctx: ctx, + ctx: ctx, } return gce } diff --git a/pkg/controller/utils_test.go b/pkg/controller/utils_test.go index 179cd325e3..d745fd95f1 100644 --- a/pkg/controller/utils_test.go +++ b/pkg/controller/utils_test.go @@ -25,12 +25,10 @@ import ( api_v1 "k8s.io/api/core/v1" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" + //"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/ingress-gce/pkg/annotations" - "k8s.io/ingress-gce/pkg/firewalls" - "k8s.io/ingress-gce/pkg/flags" "k8s.io/ingress-gce/pkg/utils" ) @@ -39,8 +37,7 @@ import ( var firstPodCreationTime = time.Date(2006, 01, 02, 15, 04, 05, 0, time.UTC) func TestZoneListing(t *testing.T) { - cm := NewFakeClusterManager(flags.DefaultClusterUID, firewalls.DefaultFirewallName) - lbc := newLoadBalancerController(t, cm) + lbc := newLoadBalancerController() zoneToNode := map[string][]string{ "zone-1": {"n1"}, "zone-2": {"n2"}, @@ -63,9 +60,10 @@ func TestZoneListing(t *testing.T) { } } +/* +* TODO(rramkumar): Move to pkg/instances in another PR func TestInstancesAddedToZones(t *testing.T) { - cm := NewFakeClusterManager(flags.DefaultClusterUID, firewalls.DefaultFirewallName) - lbc := newLoadBalancerController(t, cm) + lbc := newLoadBalancerController() zoneToNode := map[string][]string{ "zone-1": {"n1", "n2"}, "zone-2": {"n3"}, @@ -74,14 +72,14 @@ func TestInstancesAddedToZones(t *testing.T) { // Create 2 igs, one per zone. testIG := "test-ig" - lbc.CloudClusterManager.instancePool.EnsureInstanceGroupsAndPorts(testIG, []int64{int64(3001)}) + lbc.instancePool.EnsureInstanceGroupsAndPorts(testIG, []int64{int64(3001)}) // node pool syncs kube-nodes, this will add them to both igs. - lbc.CloudClusterManager.instancePool.Sync([]string{"n1", "n2", "n3"}) - gotZonesToNode := cm.fakeIGs.GetInstancesByZone() + lbc.instancePool.Sync([]string{"n1", "n2", "n3"}) + gotZonesToNode := lbc.instancePool.GetInstancesByZone() for z, nodeNames := range zoneToNode { - if ig, err := cm.fakeIGs.GetInstanceGroup(testIG, z); err != nil { + if ig, err := lbc.instancePool.GetInstanceGroup(testIG, z); err != nil { t.Errorf("Failed to find ig %v in zone %v, found %+v: %v", testIG, z, ig, err) } expNodes := sets.NewString(nodeNames...) @@ -91,6 +89,7 @@ func TestInstancesAddedToZones(t *testing.T) { } } } +*/ func addNodes(lbc *LoadBalancerController, zoneToNode map[string][]string) { for zone, nodes := range zoneToNode { @@ -111,7 +110,7 @@ func addNodes(lbc *LoadBalancerController, zoneToNode map[string][]string) { lbc.nodeLister.Add(n) } } - lbc.CloudClusterManager.instancePool.Init(lbc.Translator) + lbc.instancePool.Init(lbc.Translator) } func getProbePath(p *api_v1.Probe) string { diff --git a/pkg/firewalls/controller.go b/pkg/firewalls/controller.go index 9fe162f1c4..3e028287cd 100644 --- a/pkg/firewalls/controller.go +++ b/pkg/firewalls/controller.go @@ -54,16 +54,15 @@ type FirewallController struct { // NewFirewallController returns a new firewall controller. func NewFirewallController( ctx *context.ControllerContext, - namer *utils.Namer, portRanges []string) *FirewallController { - firewallPool := NewFirewallPool(ctx.Cloud, namer, gce.LoadBalancerSrcRanges(), portRanges) + firewallPool := NewFirewallPool(ctx.Cloud, ctx.ClusterNamer, gce.LoadBalancerSrcRanges(), portRanges) fwc := &FirewallController{ ctx: ctx, firewallPool: firewallPool, ingLister: utils.StoreToIngressLister{Store: ctx.IngressInformer.GetStore()}, - translator: translator.NewTranslator(namer, ctx), + translator: translator.NewTranslator(ctx), nodeLister: ctx.NodeInformer.GetIndexer(), hasSynced: ctx.HasSynced, } diff --git a/pkg/firewalls/controller_test.go b/pkg/firewalls/controller_test.go index ccc81a34d1..9a4c412009 100644 --- a/pkg/firewalls/controller_test.go +++ b/pkg/firewalls/controller_test.go @@ -47,8 +47,8 @@ func newFirewallController() *FirewallController { DefaultBackendSvcPortID: test.DefaultBeSvcPort.ID, } - ctx := context.NewControllerContext(kubeClient, backendConfigClient, fakeGCE, ctxConfig) - fwc := NewFirewallController(ctx, namer, []string{"30000-32767"}) + ctx := context.NewControllerContext(kubeClient, backendConfigClient, fakeGCE, namer, ctxConfig) + fwc := NewFirewallController(ctx, []string{"30000-32767"}) fwc.hasSynced = func() bool { return true } return fwc diff --git a/pkg/neg/controller_test.go b/pkg/neg/controller_test.go index 898f8b8749..6c1a960cea 100644 --- a/pkg/neg/controller_test.go +++ b/pkg/neg/controller_test.go @@ -44,6 +44,7 @@ var ( func newTestController(kubeClient kubernetes.Interface) *Controller { backendConfigClient := backendconfigclient.NewSimpleClientset() + namer := utils.NewNamer(ClusterID, "") ctxConfig := context.ControllerContextConfig{ NEGEnabled: true, BackendConfigEnabled: false, @@ -51,12 +52,12 @@ func newTestController(kubeClient kubernetes.Interface) *Controller { ResyncPeriod: 1 * time.Second, DefaultBackendSvcPortID: defaultBackend, } - context := context.NewControllerContext(kubeClient, backendConfigClient, nil, ctxConfig) + context := context.NewControllerContext(kubeClient, backendConfigClient, nil, namer, ctxConfig) controller := NewController( NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network"), context, NewFakeZoneGetter(), - utils.NewNamer(CluseterID, ""), + namer, 1*time.Second, ) return controller diff --git a/pkg/neg/manager_test.go b/pkg/neg/manager_test.go index d17297b0a0..5fed29c1ef 100644 --- a/pkg/neg/manager_test.go +++ b/pkg/neg/manager_test.go @@ -32,11 +32,12 @@ import ( ) const ( - CluseterID = "clusterid" + ClusterID = "clusterid" ) func NewTestSyncerManager(kubeClient kubernetes.Interface) *syncerManager { backendConfigClient := backendconfigclient.NewSimpleClientset() + namer := utils.NewNamer(ClusterID, "") ctxConfig := context.ControllerContextConfig{ NEGEnabled: true, BackendConfigEnabled: false, @@ -44,9 +45,9 @@ func NewTestSyncerManager(kubeClient kubernetes.Interface) *syncerManager { ResyncPeriod: 1 * time.Second, DefaultBackendSvcPortID: defaultBackend, } - context := context.NewControllerContext(kubeClient, backendConfigClient, nil, ctxConfig) + context := context.NewControllerContext(kubeClient, backendConfigClient, nil, namer, ctxConfig) manager := newSyncerManager( - utils.NewNamer(CluseterID, ""), + namer, record.NewFakeRecorder(100), NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network"), NewFakeZoneGetter(), diff --git a/pkg/neg/syncer_test.go b/pkg/neg/syncer_test.go index d1cb7aa05f..55db559663 100644 --- a/pkg/neg/syncer_test.go +++ b/pkg/neg/syncer_test.go @@ -13,6 +13,7 @@ import ( "k8s.io/client-go/tools/record" backendconfigclient "k8s.io/ingress-gce/pkg/backendconfig/client/clientset/versioned/fake" "k8s.io/ingress-gce/pkg/context" + "k8s.io/ingress-gce/pkg/utils" ) const ( @@ -25,6 +26,7 @@ const ( func NewTestSyncer() *syncer { kubeClient := fake.NewSimpleClientset() backendConfigClient := backendconfigclient.NewSimpleClientset() + namer := utils.NewNamer(ClusterID, "") ctxConfig := context.ControllerContextConfig{ NEGEnabled: true, BackendConfigEnabled: false, @@ -32,7 +34,7 @@ func NewTestSyncer() *syncer { ResyncPeriod: 1 * time.Second, DefaultBackendSvcPortID: defaultBackend, } - context := context.NewControllerContext(kubeClient, backendConfigClient, nil, ctxConfig) + context := context.NewControllerContext(kubeClient, backendConfigClient, nil, namer, ctxConfig) svcPort := servicePort{ namespace: testServiceNamespace, name: testServiceName,