diff --git a/controllers/gce/backends/backends.go b/controllers/gce/backends/backends.go index 49b632efe4..cdd2dd68dd 100644 --- a/controllers/gce/backends/backends.go +++ b/controllers/gce/backends/backends.go @@ -210,24 +210,43 @@ func (b *Backends) create(namedPort *compute.NamedPort, hcLink string, sp Servic return b.Get(namedPort.Port) } -// Add will get or create a Backend for the given port. +// Ensure will update or create Backends for the given ports. // Uses the given instance groups if non-nil, else creates instance groups. -func (b *Backends) Add(p ServicePort, igs []*compute.InstanceGroup) error { - // We must track the port even if creating the backend failed, because - // we might've created a health-check for it. - be := &compute.BackendService{} - defer func() { b.snapshotter.Add(portKey(p.Port), be) }() - - var err error +func (b *Backends) Ensure(svcPorts []ServicePort, igs []*compute.InstanceGroup) error { + glog.V(3).Infof("Sync: backends %v", svcPorts) // Ideally callers should pass the instance groups to prevent recomputing them here. // Igs can be nil in scenarios where we do not have instance groups such as // while syncing default backend service. if igs == nil { - igs, _, err = instances.EnsureInstanceGroupsAndPorts(b.nodePool, b.namer, p.Port) + ports := []int64{} + for _, p := range svcPorts { + ports = append(ports, p.Port) + } + var err error + igs, _, err = instances.EnsureInstanceGroupsAndPorts(b.nodePool, b.namer, ports) if err != nil { return err } } + // create backends for new ports, perform an edge hop for existing ports + for _, port := range svcPorts { + if err := b.ensureBackendService(port, igs); err != nil { + return err + } + } + return nil +} + +// ensureBackendService will update or create a Backend for the given port. +// It assumes that the instance groups have been created and required named port has been added. +// If not, then Ensure should be called instead. +func (b *Backends) ensureBackendService(p ServicePort, igs []*compute.InstanceGroup) error { + // We must track the ports even if creating the backends failed, because + // we might've created health-check for them. + be := &compute.BackendService{} + defer func() { b.snapshotter.Add(portKey(p.Port), be) }() + + var err error // Ensure health check for backend service exists hcLink, err := b.ensureHealthCheck(p) @@ -388,19 +407,6 @@ func (b *Backends) edgeHop(be *compute.BackendService, igs []*compute.InstanceGr return fmt.Errorf("received errors when updating backend service: %v", strings.Join(errs, "\n")) } -// Sync syncs backend services corresponding to ports in the given list. -func (b *Backends) Sync(svcNodePorts []ServicePort, igs []*compute.InstanceGroup) error { - glog.V(3).Infof("Sync: backends %v", svcNodePorts) - - // create backends for new ports, perform an edge hop for existing ports - for _, port := range svcNodePorts { - if err := b.Add(port, igs); err != nil { - return err - } - } - return nil -} - // GC garbage collects services corresponding to ports in the given list. func (b *Backends) GC(svcNodePorts []ServicePort) error { knownPorts := sets.NewString() diff --git a/controllers/gce/backends/backends_test.go b/controllers/gce/backends/backends_test.go index 8d116ac6ed..ba4ed51f5b 100644 --- a/controllers/gce/backends/backends_test.go +++ b/controllers/gce/backends/backends_test.go @@ -80,7 +80,7 @@ func TestBackendPoolAdd(t *testing.T) { // Add a backend for a port, then re-add the same port and // make sure it corrects a broken link from the backend to // the instance group. - err := pool.Add(nodePort, nil) + err := pool.Ensure([]ServicePort{nodePort}, nil) if err != nil { t.Fatalf("Did not find expect error when adding a nodeport: %v, err: %v", nodePort, err) } @@ -95,10 +95,11 @@ func TestBackendPoolAdd(t *testing.T) { t.Fatalf("Backend %v has wrong port %v, expected %v", be.Name, be.Port, nodePort) } - // Check that the instance group has the new port + // Check that the instance group has the new port. + ig, err := fakeIGs.GetInstanceGroup(namer.IGName(), defaultZone) var found bool - for _, port := range fakeIGs.Ports { - if port == nodePort.Port { + for _, port := range ig.NamedPorts { + if port.Port == nodePort.Port { found = true } } @@ -143,7 +144,7 @@ func TestHealthCheckMigration(t *testing.T) { hcp.CreateHttpHealthCheck(legacyHC) // Add the service port to the backend pool - pool.Add(p, nil) + pool.Ensure([]ServicePort{p}, nil) // Assert the proper health check was created hc, _ := pool.healthChecker.Get(p.Port) @@ -168,7 +169,7 @@ func TestBackendPoolUpdate(t *testing.T) { namer := utils.Namer{} p := ServicePort{Port: 3000, Protocol: utils.ProtocolHTTP} - pool.Add(p, nil) + pool.Ensure([]ServicePort{p}, nil) beName := namer.BeName(p.Port) be, err := f.GetGlobalBackendService(beName) @@ -188,7 +189,7 @@ func TestBackendPoolUpdate(t *testing.T) { // Update service port to encrypted p.Protocol = utils.ProtocolHTTPS - pool.Sync([]ServicePort{p}, nil) + pool.Ensure([]ServicePort{p}, nil) be, err = f.GetGlobalBackendService(beName) if err != nil { @@ -214,7 +215,7 @@ func TestBackendPoolChaosMonkey(t *testing.T) { namer := utils.Namer{} nodePort := ServicePort{Port: 8080, Protocol: utils.ProtocolHTTP} - pool.Add(nodePort, nil) + pool.Ensure([]ServicePort{nodePort}, nil) beName := namer.BeName(nodePort.Port) be, _ := f.GetGlobalBackendService(beName) @@ -227,7 +228,7 @@ func TestBackendPoolChaosMonkey(t *testing.T) { f.calls = []int{} f.UpdateGlobalBackendService(be) - pool.Add(nodePort, nil) + pool.Ensure([]ServicePort{nodePort}, nil) for _, call := range f.calls { if call == utils.Create { t.Fatalf("Unexpected create for existing backend service") @@ -260,10 +261,10 @@ func TestBackendPoolSync(t *testing.T) { f := NewFakeBackendServices(noOpErrFunc) fakeIGs := instances.NewFakeInstanceGroups(sets.NewString()) pool, _ := newTestJig(f, fakeIGs, true) - pool.Add(ServicePort{Port: 81}, nil) - pool.Add(ServicePort{Port: 90}, nil) - if err := pool.Sync(svcNodePorts, nil); err != nil { - t.Errorf("Expected backend pool to sync, err: %v", err) + pool.Ensure([]ServicePort{ServicePort{Port: 81}}, nil) + pool.Ensure([]ServicePort{ServicePort{Port: 90}}, nil) + if err := pool.Ensure(svcNodePorts, nil); err != nil { + t.Errorf("Expected backend pool to add node ports, err: %v", err) } if err := pool.GC(svcNodePorts); err != nil { t.Errorf("Expected backend pool to GC, err: %v", err) @@ -361,7 +362,7 @@ func TestBackendPoolDeleteLegacyHealthChecks(t *testing.T) { }) // Have pool sync the above backend service - bp.Add(ServicePort{Port: 80, Protocol: utils.ProtocolHTTPS}, nil) + bp.Ensure([]ServicePort{ServicePort{Port: 80, Protocol: utils.ProtocolHTTPS}}, nil) // Verify the legacy health check has been deleted _, err = hcp.GetHttpHealthCheck(beName) @@ -388,7 +389,7 @@ func TestBackendPoolShutdown(t *testing.T) { namer := utils.Namer{} // Add a backend-service and verify that it doesn't exist after Shutdown() - pool.Add(ServicePort{Port: 80}, nil) + pool.Ensure([]ServicePort{ServicePort{Port: 80}}, nil) pool.Shutdown() if _, err := f.GetGlobalBackendService(namer.BeName(80)); err == nil { t.Fatalf("%v", err) @@ -402,7 +403,7 @@ func TestBackendInstanceGroupClobbering(t *testing.T) { namer := utils.Namer{} // This will add the instance group k8s-ig to the instance pool - pool.Add(ServicePort{Port: 80}, nil) + pool.Ensure([]ServicePort{ServicePort{Port: 80}}, nil) be, err := f.GetGlobalBackendService(namer.BeName(80)) if err != nil { @@ -420,7 +421,7 @@ func TestBackendInstanceGroupClobbering(t *testing.T) { } // Make sure repeated adds don't clobber the inserted instance group - pool.Add(ServicePort{Port: 80}, nil) + pool.Ensure([]ServicePort{ServicePort{Port: 80}}, nil) be, err = f.GetGlobalBackendService(namer.BeName(80)) if err != nil { t.Fatalf("%v", err) @@ -462,7 +463,7 @@ func TestBackendCreateBalancingMode(t *testing.T) { return nil } - pool.Add(nodePort, nil) + pool.Ensure([]ServicePort{nodePort}, nil) be, err := f.GetGlobalBackendService(namer.BeName(nodePort.Port)) if err != nil { t.Fatalf("%v", err) diff --git a/controllers/gce/backends/interfaces.go b/controllers/gce/backends/interfaces.go index 586ceb17c5..451d5ea731 100644 --- a/controllers/gce/backends/interfaces.go +++ b/controllers/gce/backends/interfaces.go @@ -30,10 +30,9 @@ type probeProvider interface { // as gce backendServices, and sync them through the BackendServices interface. type BackendPool interface { Init(p probeProvider) - Add(port ServicePort, igs []*compute.InstanceGroup) error + Ensure(ports []ServicePort, igs []*compute.InstanceGroup) error Get(port int64) (*compute.BackendService, error) Delete(port int64) error - Sync(ports []ServicePort, igs []*compute.InstanceGroup) error GC(ports []ServicePort) error Shutdown() error Status(name string) string diff --git a/controllers/gce/controller/cluster_manager.go b/controllers/gce/controller/cluster_manager.go index e987711b97..b8b9955983 100644 --- a/controllers/gce/controller/cluster_manager.go +++ b/controllers/gce/controller/cluster_manager.go @@ -134,7 +134,7 @@ func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeName if err != nil { return igs, err } - if err := c.backendPool.Sync(backendServicePorts, igs); err != nil { + if err := c.backendPool.Ensure(backendServicePorts, igs); err != nil { return igs, err } if err := c.instancePool.Sync(nodeNames); err != nil { @@ -167,20 +167,12 @@ func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeName } func (c *ClusterManager) EnsureInstanceGroupsAndPorts(servicePorts []backends.ServicePort) ([]*compute.InstanceGroup, error) { - var igs []*compute.InstanceGroup - var err error + ports := []int64{} for _, p := range servicePorts { - // EnsureInstanceGroupsAndPorts always returns all the instance groups, so we can return - // the output of any call, no need to append the return from all calls. - // TODO: Ideally, we want to call CreateInstaceGroups only the first time and - // then call AddNamedPort multiple times. Need to update the interface to - // achieve this. - igs, _, err = instances.EnsureInstanceGroupsAndPorts(c.instancePool, c.ClusterNamer, p.Port) - if err != nil { - return nil, err - } + ports = append(ports, p.Port) } - return igs, nil + igs, _, err := instances.EnsureInstanceGroupsAndPorts(c.instancePool, c.ClusterNamer, ports) + return igs, err } // GC garbage collects unused resources. diff --git a/controllers/gce/controller/utils_test.go b/controllers/gce/controller/utils_test.go index 196f3c9372..88c70b696e 100644 --- a/controllers/gce/controller/utils_test.go +++ b/controllers/gce/controller/utils_test.go @@ -71,17 +71,12 @@ func TestInstancesAddedToZones(t *testing.T) { // Create 2 igs, one per zone. testIG := "test-ig" - testPort := int64(3001) - lbc.CloudClusterManager.instancePool.AddInstanceGroup(testIG, testPort) + lbc.CloudClusterManager.instancePool.AddInstanceGroup(testIG, []int64{int64(3001)}) // node pool syncs kube-nodes, this will add them to both igs. lbc.CloudClusterManager.instancePool.Sync([]string{"n1", "n2", "n3"}) gotZonesToNode := cm.fakeIGs.GetInstancesByZone() - if cm.fakeIGs.Ports[0] != testPort { - t.Errorf("Expected the same node port on all igs, got ports %+v", cm.fakeIGs.Ports) - } - for z, nodeNames := range zoneToNode { if ig, err := cm.fakeIGs.GetInstanceGroup(testIG, z); err != nil { t.Errorf("Failed to find ig %v in zone %v, found %+v: %v", testIG, z, ig, err) diff --git a/controllers/gce/instances/fakes.go b/controllers/gce/instances/fakes.go index 37d38fa817..ca42f57fd6 100644 --- a/controllers/gce/instances/fakes.go +++ b/controllers/gce/instances/fakes.go @@ -60,7 +60,6 @@ func (z *FakeZoneLister) GetZoneForNode(name string) (string, error) { type FakeInstanceGroups struct { instances sets.String instanceGroups []*compute.InstanceGroup - Ports []int64 getResult *compute.InstanceGroup listResult *compute.InstanceGroupsListInstances calls []int @@ -150,21 +149,18 @@ func (f *FakeInstanceGroups) RemoveInstancesFromInstanceGroup(name, zone string, } func (f *FakeInstanceGroups) SetNamedPortsOfInstanceGroup(igName, zone string, namedPorts []*compute.NamedPort) error { - found := false - for _, ig := range f.instanceGroups { - if ig.Name == igName && ig.Zone == zone { - found = true + var ig *compute.InstanceGroup + for _, igp := range f.instanceGroups { + if igp.Name == igName && igp.Zone == zone { + ig = igp break } } - if !found { + if ig == nil { return fmt.Errorf("Failed to find instance group %q in zone %q", igName, zone) } - f.Ports = f.Ports[:0] - for _, port := range namedPorts { - f.Ports = append(f.Ports, port.Port) - } + ig.NamedPorts = namedPorts return nil } diff --git a/controllers/gce/instances/instances.go b/controllers/gce/instances/instances.go index fd54de0340..7efb8d37cc 100644 --- a/controllers/gce/instances/instances.go +++ b/controllers/gce/instances/instances.go @@ -59,15 +59,18 @@ func (i *Instances) Init(zl zoneLister) { } // AddInstanceGroup creates or gets an instance group if it doesn't exist -// and adds the given port to it. Returns a list of one instance group per zone, -// all of which have the exact same named port. -func (i *Instances) AddInstanceGroup(name string, port int64) ([]*compute.InstanceGroup, *compute.NamedPort, error) { +// and adds the given ports to it. Returns a list of one instance group per zone, +// all of which have the exact same named ports. +func (i *Instances) AddInstanceGroup(name string, ports []int64) ([]*compute.InstanceGroup, []*compute.NamedPort, error) { igs := []*compute.InstanceGroup{} - namedPort := utils.GetNamedPort(port) + namedPorts := []*compute.NamedPort{} + for _, port := range ports { + namedPorts = append(namedPorts, utils.GetNamedPort(port)) + } zones, err := i.ListZones() if err != nil { - return igs, namedPort, err + return igs, namedPorts, err } defer i.snapshotter.Add(name, struct{}{}) @@ -99,23 +102,27 @@ func (i *Instances) AddInstanceGroup(name string, port int64) ([]*compute.Instan glog.V(3).Infof("Instance group %v already exists in zone %v", name, zone) } - found := false + existingPorts := map[int64]bool{} for _, np := range ig.NamedPorts { - if np.Port == port { + existingPorts[np.Port] = true + } + var newPorts []*compute.NamedPort + for _, np := range namedPorts { + if existingPorts[np.Port] { glog.V(3).Infof("Instance group %v already has named port %+v", ig.Name, np) - found = true - break + continue } + newPorts = append(newPorts, np) } - if !found { - glog.V(3).Infof("Instance group %v/%v does not have port %+v, adding it now.", zone, name, namedPort) - if err := i.cloud.SetNamedPortsOfInstanceGroup(ig.Name, zone, append(ig.NamedPorts, namedPort)); err != nil { + if len(newPorts) > 0 { + glog.V(5).Infof("Instance group %v/%v does not have ports %+v, adding them now.", zone, name, namedPorts) + if err := i.cloud.SetNamedPortsOfInstanceGroup(ig.Name, zone, append(ig.NamedPorts, namedPorts...)); err != nil { return nil, nil, err } } igs = append(igs, ig) } - return igs, namedPort, nil + return igs, namedPorts, nil } // DeleteInstanceGroup deletes the given IG by name, from all zones. diff --git a/controllers/gce/instances/instances_test.go b/controllers/gce/instances/instances_test.go index 9baca18a1e..b7b559eed0 100644 --- a/controllers/gce/instances/instances_test.go +++ b/controllers/gce/instances/instances_test.go @@ -34,7 +34,7 @@ func TestNodePoolSync(t *testing.T) { f := NewFakeInstanceGroups(sets.NewString( []string{"n1", "n2"}...)) pool := newNodePool(f, defaultZone) - pool.AddInstanceGroup("test", 80) + pool.AddInstanceGroup("test", []int64{80}) // KubeNodes: n1 // GCENodes: n1, n2 @@ -53,7 +53,7 @@ func TestNodePoolSync(t *testing.T) { f = NewFakeInstanceGroups(sets.NewString([]string{"n1"}...)) pool = newNodePool(f, defaultZone) - pool.AddInstanceGroup("test", 80) + pool.AddInstanceGroup("test", []int64{80}) f.calls = []int{} kubeNodes = sets.NewString([]string{"n1", "n2"}...) @@ -69,7 +69,7 @@ func TestNodePoolSync(t *testing.T) { f = NewFakeInstanceGroups(sets.NewString([]string{"n1", "n2"}...)) pool = newNodePool(f, defaultZone) - pool.AddInstanceGroup("test", 80) + pool.AddInstanceGroup("test", []int64{80}) f.calls = []int{} kubeNodes = sets.NewString([]string{"n1", "n2"}...) @@ -79,3 +79,49 @@ func TestNodePoolSync(t *testing.T) { "Did not expect any calls, got %+v", f.calls) } } + +func TestSetNamedPorts(t *testing.T) { + f := NewFakeInstanceGroups(sets.NewString( + []string{"ig"}...)) + pool := newNodePool(f, defaultZone) + + testCases := []struct { + newPorts []int64 + expectedPorts []int64 + }{ + { + // Verify adding a port works as expected. + []int64{80}, + []int64{80}, + }, + { + // Verify adding multiple ports at once works as expected. + []int64{81, 82}, + []int64{80, 81, 82}, + }, + { + // Adding existing ports should have no impact. + []int64{80, 82}, + []int64{80, 81, 82}, + }, + // TODO: Add tests to remove named ports when we support that. + } + for _, test := range testCases { + igs, _, err := pool.AddInstanceGroup("ig", test.newPorts) + if err != nil { + t.Fatalf("unexpected error in adding ports %v to instance group: %s", test.newPorts, err) + } + if len(igs) != 1 { + t.Fatalf("expected a single instance group, got: %v", igs) + } + actualPorts := igs[0].NamedPorts + if len(actualPorts) != len(test.expectedPorts) { + t.Fatalf("unexpected named ports on instance group. expected: %v, got: %v", test.expectedPorts, actualPorts) + } + for i, p := range igs[0].NamedPorts { + if p.Port != test.expectedPorts[i] { + t.Fatalf("unexpected named ports on instance group. expected: %v, got: %v", test.expectedPorts, actualPorts) + } + } + } +} diff --git a/controllers/gce/instances/interfaces.go b/controllers/gce/instances/interfaces.go index 94d3116b76..f0d4f0096e 100644 --- a/controllers/gce/instances/interfaces.go +++ b/controllers/gce/instances/interfaces.go @@ -32,7 +32,7 @@ type NodePool interface { Init(zl zoneLister) // The following 2 methods operate on instance groups. - AddInstanceGroup(name string, port int64) ([]*compute.InstanceGroup, *compute.NamedPort, error) + AddInstanceGroup(name string, ports []int64) ([]*compute.InstanceGroup, []*compute.NamedPort, error) DeleteInstanceGroup(name string) error // TODO: Refactor for modularity diff --git a/controllers/gce/instances/utils.go b/controllers/gce/instances/utils.go index 934311c66b..732ed9017f 100644 --- a/controllers/gce/instances/utils.go +++ b/controllers/gce/instances/utils.go @@ -8,6 +8,6 @@ import ( // Helper method to create instance groups. // This method exists to ensure that we are using the same logic at all places. -func EnsureInstanceGroupsAndPorts(nodePool NodePool, namer *utils.Namer, port int64) ([]*compute.InstanceGroup, *compute.NamedPort, error) { - return nodePool.AddInstanceGroup(namer.IGName(), port) +func EnsureInstanceGroupsAndPorts(nodePool NodePool, namer *utils.Namer, ports []int64) ([]*compute.InstanceGroup, []*compute.NamedPort, error) { + return nodePool.AddInstanceGroup(namer.IGName(), ports) } diff --git a/controllers/gce/loadbalancers/loadbalancers.go b/controllers/gce/loadbalancers/loadbalancers.go index 15ded2562b..2442cc2ee2 100644 --- a/controllers/gce/loadbalancers/loadbalancers.go +++ b/controllers/gce/loadbalancers/loadbalancers.go @@ -169,7 +169,7 @@ func (l *L7s) Sync(lbs []*L7RuntimeInfo) error { // Lazily create a default backend so we don't tax users who don't care // about Ingress by consuming 1 of their 3 GCE BackendServices. This // BackendService is GC'd when there are no more Ingresses. - if err := l.defaultBackendPool.Add(l.defaultBackendNodePort, nil); err != nil { + if err := l.defaultBackendPool.Ensure([]backends.ServicePort{l.defaultBackendNodePort}, nil); err != nil { return err } defaultBackend, err := l.defaultBackendPool.Get(l.defaultBackendNodePort.Port)