diff --git a/controllers/gce/backends/backends.go b/controllers/gce/backends/backends.go index 8e8dd3706e..5393bebf93 100644 --- a/controllers/gce/backends/backends.go +++ b/controllers/gce/backends/backends.go @@ -211,15 +211,19 @@ func (b *Backends) create(namedPort *compute.NamedPort, hcLink string, sp Servic } // Add will get or create a Backend for the given port. -func (b *Backends) Add(p ServicePort) error { +// Uses the given instance groups if non-nil, else creates instance groups. +func (b *Backends) Add(p ServicePort, igs []*compute.InstanceGroup) error { // We must track the port even if creating the backend failed, because // we might've created a health-check for it. be := &compute.BackendService{} defer func() { b.snapshotter.Add(portKey(p.Port), be) }() - igs, namedPort, err := instances.CreateInstanceGroups(b.nodePool, b.namer, p.Port) - if err != nil { - return err + var err error + if igs == nil { + igs, _, err = instances.CreateInstanceGroups(b.nodePool, b.namer, p.Port) + if err != nil { + return err + } } // Ensure health check for backend service exists @@ -232,6 +236,7 @@ func (b *Backends) Add(p ServicePort) error { pName := b.namer.BeName(p.Port) be, _ = b.Get(p.Port) if be == nil { + namedPort := utils.GetNamedPort(p.Port) glog.V(2).Infof("Creating backend service for port %v named port %v", p.Port, namedPort) be, err = b.create(namedPort, hcLink, p, pName) if err != nil { @@ -381,12 +386,12 @@ func (b *Backends) edgeHop(be *compute.BackendService, igs []*compute.InstanceGr } // Sync syncs backend services corresponding to ports in the given list. -func (b *Backends) Sync(svcNodePorts []ServicePort) error { +func (b *Backends) Sync(svcNodePorts []ServicePort, igs []*compute.InstanceGroup) error { glog.V(3).Infof("Sync: backends %v", svcNodePorts) // create backends for new ports, perform an edge hop for existing ports for _, port := range svcNodePorts { - if err := b.Add(port); err != nil { + if err := b.Add(port, igs); err != nil { return err } } diff --git a/controllers/gce/backends/backends_test.go b/controllers/gce/backends/backends_test.go index ba1331558d..8d116ac6ed 100644 --- a/controllers/gce/backends/backends_test.go +++ b/controllers/gce/backends/backends_test.go @@ -80,7 +80,7 @@ func TestBackendPoolAdd(t *testing.T) { // Add a backend for a port, then re-add the same port and // make sure it corrects a broken link from the backend to // the instance group. - err := pool.Add(nodePort) + err := pool.Add(nodePort, nil) if err != nil { t.Fatalf("Did not find expect error when adding a nodeport: %v, err: %v", nodePort, err) } @@ -143,7 +143,7 @@ func TestHealthCheckMigration(t *testing.T) { hcp.CreateHttpHealthCheck(legacyHC) // Add the service port to the backend pool - pool.Add(p) + pool.Add(p, nil) // Assert the proper health check was created hc, _ := pool.healthChecker.Get(p.Port) @@ -168,7 +168,7 @@ func TestBackendPoolUpdate(t *testing.T) { namer := utils.Namer{} p := ServicePort{Port: 3000, Protocol: utils.ProtocolHTTP} - pool.Add(p) + pool.Add(p, nil) beName := namer.BeName(p.Port) be, err := f.GetGlobalBackendService(beName) @@ -188,7 +188,7 @@ func TestBackendPoolUpdate(t *testing.T) { // Update service port to encrypted p.Protocol = utils.ProtocolHTTPS - pool.Sync([]ServicePort{p}) + pool.Sync([]ServicePort{p}, nil) be, err = f.GetGlobalBackendService(beName) if err != nil { @@ -214,7 +214,7 @@ func TestBackendPoolChaosMonkey(t *testing.T) { namer := utils.Namer{} nodePort := ServicePort{Port: 8080, Protocol: utils.ProtocolHTTP} - pool.Add(nodePort) + pool.Add(nodePort, nil) beName := namer.BeName(nodePort.Port) be, _ := f.GetGlobalBackendService(beName) @@ -227,7 +227,7 @@ func TestBackendPoolChaosMonkey(t *testing.T) { f.calls = []int{} f.UpdateGlobalBackendService(be) - pool.Add(nodePort) + pool.Add(nodePort, nil) for _, call := range f.calls { if call == utils.Create { t.Fatalf("Unexpected create for existing backend service") @@ -260,9 +260,9 @@ func TestBackendPoolSync(t *testing.T) { f := NewFakeBackendServices(noOpErrFunc) fakeIGs := instances.NewFakeInstanceGroups(sets.NewString()) pool, _ := newTestJig(f, fakeIGs, true) - pool.Add(ServicePort{Port: 81}) - pool.Add(ServicePort{Port: 90}) - if err := pool.Sync(svcNodePorts); err != nil { + pool.Add(ServicePort{Port: 81}, nil) + pool.Add(ServicePort{Port: 90}, nil) + if err := pool.Sync(svcNodePorts, nil); err != nil { t.Errorf("Expected backend pool to sync, err: %v", err) } if err := pool.GC(svcNodePorts); err != nil { @@ -361,7 +361,7 @@ func TestBackendPoolDeleteLegacyHealthChecks(t *testing.T) { }) // Have pool sync the above backend service - bp.Add(ServicePort{Port: 80, Protocol: utils.ProtocolHTTPS}) + bp.Add(ServicePort{Port: 80, Protocol: utils.ProtocolHTTPS}, nil) // Verify the legacy health check has been deleted _, err = hcp.GetHttpHealthCheck(beName) @@ -388,7 +388,7 @@ func TestBackendPoolShutdown(t *testing.T) { namer := utils.Namer{} // Add a backend-service and verify that it doesn't exist after Shutdown() - pool.Add(ServicePort{Port: 80}) + pool.Add(ServicePort{Port: 80}, nil) pool.Shutdown() if _, err := f.GetGlobalBackendService(namer.BeName(80)); err == nil { t.Fatalf("%v", err) @@ -402,7 +402,7 @@ func TestBackendInstanceGroupClobbering(t *testing.T) { namer := utils.Namer{} // This will add the instance group k8s-ig to the instance pool - pool.Add(ServicePort{Port: 80}) + pool.Add(ServicePort{Port: 80}, nil) be, err := f.GetGlobalBackendService(namer.BeName(80)) if err != nil { @@ -420,7 +420,7 @@ func TestBackendInstanceGroupClobbering(t *testing.T) { } // Make sure repeated adds don't clobber the inserted instance group - pool.Add(ServicePort{Port: 80}) + pool.Add(ServicePort{Port: 80}, nil) be, err = f.GetGlobalBackendService(namer.BeName(80)) if err != nil { t.Fatalf("%v", err) @@ -462,7 +462,7 @@ func TestBackendCreateBalancingMode(t *testing.T) { return nil } - pool.Add(nodePort) + pool.Add(nodePort, nil) be, err := f.GetGlobalBackendService(namer.BeName(nodePort.Port)) if err != nil { t.Fatalf("%v", err) diff --git a/controllers/gce/backends/interfaces.go b/controllers/gce/backends/interfaces.go index 2153f35056..586ceb17c5 100644 --- a/controllers/gce/backends/interfaces.go +++ b/controllers/gce/backends/interfaces.go @@ -30,10 +30,10 @@ type probeProvider interface { // as gce backendServices, and sync them through the BackendServices interface. type BackendPool interface { Init(p probeProvider) - Add(port ServicePort) error + Add(port ServicePort, igs []*compute.InstanceGroup) error Get(port int64) (*compute.BackendService, error) Delete(port int64) error - Sync(ports []ServicePort) error + Sync(ports []ServicePort, igs []*compute.InstanceGroup) error GC(ports []ServicePort) error Shutdown() error Status(name string) string diff --git a/controllers/gce/controller/cluster_manager.go b/controllers/gce/controller/cluster_manager.go index 902c32354c..3b95d8df7b 100644 --- a/controllers/gce/controller/cluster_manager.go +++ b/controllers/gce/controller/cluster_manager.go @@ -116,9 +116,10 @@ func (c *ClusterManager) shutdown() error { // instance groups. // - backendServicePorts are the ports for which we require BackendServices. // - namedPorts are the ports which must be opened on instance groups. +// Returns the list of all instance groups corresponding to the given loadbalancers. // If in performing the checkpoint the cluster manager runs out of quota, a // googleapi 403 is returned. -func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeNames []string, backendServicePorts []backends.ServicePort, namedPorts []backends.ServicePort) error { +func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeNames []string, backendServicePorts []backends.ServicePort, namedPorts []backends.ServicePort) ([]*compute.InstanceGroup, error) { if len(namedPorts) != 0 { // Add the default backend node port to the list of named ports for instance groups. namedPorts = append(namedPorts, c.defaultBackendNodePort) @@ -129,19 +130,19 @@ func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeName namedPorts = uniq(namedPorts) backendServicePorts = uniq(backendServicePorts) // Create Instance Groups. - _, err := c.CreateInstanceGroups(namedPorts) + igs, err := c.CreateInstanceGroups(namedPorts) if err != nil { - return err + return igs, err } - if err := c.backendPool.Sync(backendServicePorts); err != nil { - return err + if err := c.backendPool.Sync(backendServicePorts, igs); err != nil { + return igs, err } if err := c.SyncNodesInInstanceGroups(nodeNames); err != nil { - return err + return igs, err } if err := c.l7Pool.Sync(lbs); err != nil { - return err + return igs, err } // TODO: Manage default backend and its firewall rule in a centralized way. @@ -160,10 +161,10 @@ func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeName np = append(np, p.Port) } if err := c.firewallPool.Sync(np, nodeNames); err != nil { - return err + return igs, err } - return nil + return igs, nil } func (c *ClusterManager) CreateInstanceGroups(servicePorts []backends.ServicePort) ([]*compute.InstanceGroup, error) { diff --git a/controllers/gce/controller/controller.go b/controllers/gce/controller/controller.go index 0bf1a69b5c..ce6be68852 100644 --- a/controllers/gce/controller/controller.go +++ b/controllers/gce/controller/controller.go @@ -24,6 +24,8 @@ import ( "github.com/golang/glog" + compute "google.golang.org/api/compute/v1" + apiv1 "k8s.io/api/core/v1" extensions "k8s.io/api/extensions/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -36,7 +38,6 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" - "k8s.io/ingress/controllers/gce/backends" "k8s.io/ingress/controllers/gce/loadbalancers" ) @@ -319,10 +320,10 @@ func (lbc *LoadBalancerController) sync(key string) (err error) { } glog.V(3).Infof("Finished syncing %v", key) }() - + igs := []*compute.InstanceGroup{} // Record any errors during sync and throw a single error at the end. This // allows us to free up associated cloud resources ASAP. - if err := lbc.CloudClusterManager.Checkpoint(lbs, nodeNames, gceNodePorts, allNodePorts); err != nil { + if igs, err = lbc.CloudClusterManager.Checkpoint(lbs, nodeNames, gceNodePorts, allNodePorts); err != nil { // TODO: Implement proper backoff for the queue. eventMsg := "GCE" if ingExists { @@ -342,13 +343,6 @@ func (lbc *LoadBalancerController) sync(key string) (err error) { if ing.Annotations == nil { ing.Annotations = map[string]string{} } - // Since we just created instance groups in Checkpoint, calling create - // instance groups again should just return names of the existing - // instance groups. It does not matter which nodePort we pass as argument. - igs, err := lbc.CloudClusterManager.CreateInstanceGroups([]backends.ServicePort{allNodePorts[0]}) - if err != nil { - return fmt.Errorf("error in creating instance groups: %v", err) - } err = setInstanceGroupsAnnotation(ing.Annotations, igs) if err != nil { return err diff --git a/controllers/gce/instances/instances.go b/controllers/gce/instances/instances.go index 94664b065f..fd54de0340 100644 --- a/controllers/gce/instances/instances.go +++ b/controllers/gce/instances/instances.go @@ -63,8 +63,7 @@ func (i *Instances) Init(zl zoneLister) { // all of which have the exact same named port. func (i *Instances) AddInstanceGroup(name string, port int64) ([]*compute.InstanceGroup, *compute.NamedPort, error) { igs := []*compute.InstanceGroup{} - // TODO: move port naming to namer - namedPort := &compute.NamedPort{Name: fmt.Sprintf("port%v", port), Port: port} + namedPort := utils.GetNamedPort(port) zones, err := i.ListZones() if err != nil { diff --git a/controllers/gce/loadbalancers/loadbalancers.go b/controllers/gce/loadbalancers/loadbalancers.go index e8ce542280..15ded2562b 100644 --- a/controllers/gce/loadbalancers/loadbalancers.go +++ b/controllers/gce/loadbalancers/loadbalancers.go @@ -169,7 +169,7 @@ func (l *L7s) Sync(lbs []*L7RuntimeInfo) error { // Lazily create a default backend so we don't tax users who don't care // about Ingress by consuming 1 of their 3 GCE BackendServices. This // BackendService is GC'd when there are no more Ingresses. - if err := l.defaultBackendPool.Add(l.defaultBackendNodePort); err != nil { + if err := l.defaultBackendPool.Add(l.defaultBackendNodePort, nil); err != nil { return err } defaultBackend, err := l.defaultBackendPool.Get(l.defaultBackendNodePort.Port) diff --git a/controllers/gce/utils/utils.go b/controllers/gce/utils/utils.go index 237943a3a2..c0486f5570 100644 --- a/controllers/gce/utils/utils.go +++ b/controllers/gce/utils/utils.go @@ -353,3 +353,9 @@ func CompareLinks(l1, l2 string) bool { // FakeIngressRuleValueMap is a convenience type used by multiple submodules // that share the same testing methods. type FakeIngressRuleValueMap map[string]string + +// GetNamedPort creates the NamedPort API object for the given port. +func GetNamedPort(port int64) *compute.NamedPort { + // TODO: move port naming to namer + return &compute.NamedPort{Name: fmt.Sprintf("port%v", port), Port: port} +}