Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding logic to GCE ingress controller to handle multi cluster ingresses #1033

Merged
merged 3 commits into from
Sep 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions controllers/gce/backends/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,15 +211,22 @@ func (b *Backends) create(namedPort *compute.NamedPort, hcLink string, sp Servic
}

// Add will get or create a Backend for the given port.
func (b *Backends) Add(p ServicePort) error {
// Uses the given instance groups if non-nil, else creates instance groups.
func (b *Backends) Add(p ServicePort, igs []*compute.InstanceGroup) error {
// We must track the port even if creating the backend failed, because
// we might've created a health-check for it.
be := &compute.BackendService{}
defer func() { b.snapshotter.Add(portKey(p.Port), be) }()

igs, namedPort, err := b.nodePool.AddInstanceGroup(b.namer.IGName(), p.Port)
if err != nil {
return err
var err error
// Ideally callers should pass the instance groups to prevent recomputing them here.
// Igs can be nil in scenarios where we do not have instance groups such as
// while syncing default backend service.
if igs == nil {
igs, _, err = instances.EnsureInstanceGroupsAndPorts(b.nodePool, b.namer, p.Port)
if err != nil {
return err
}
}

// Ensure health check for backend service exists
Expand All @@ -232,6 +239,7 @@ func (b *Backends) Add(p ServicePort) error {
pName := b.namer.BeName(p.Port)
be, _ = b.Get(p.Port)
if be == nil {
namedPort := utils.GetNamedPort(p.Port)
glog.V(2).Infof("Creating backend service for port %v named port %v", p.Port, namedPort)
be, err = b.create(namedPort, hcLink, p, pName)
if err != nil {
Expand Down Expand Up @@ -381,12 +389,12 @@ func (b *Backends) edgeHop(be *compute.BackendService, igs []*compute.InstanceGr
}

// Sync syncs backend services corresponding to ports in the given list.
func (b *Backends) Sync(svcNodePorts []ServicePort) error {
func (b *Backends) Sync(svcNodePorts []ServicePort, igs []*compute.InstanceGroup) error {
glog.V(3).Infof("Sync: backends %v", svcNodePorts)

// create backends for new ports, perform an edge hop for existing ports
for _, port := range svcNodePorts {
if err := b.Add(port); err != nil {
if err := b.Add(port, igs); err != nil {
return err
}
}
Expand Down
28 changes: 14 additions & 14 deletions controllers/gce/backends/backends_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestBackendPoolAdd(t *testing.T) {
// Add a backend for a port, then re-add the same port and
// make sure it corrects a broken link from the backend to
// the instance group.
err := pool.Add(nodePort)
err := pool.Add(nodePort, nil)
if err != nil {
t.Fatalf("Did not find expect error when adding a nodeport: %v, err: %v", nodePort, err)
}
Expand Down Expand Up @@ -143,7 +143,7 @@ func TestHealthCheckMigration(t *testing.T) {
hcp.CreateHttpHealthCheck(legacyHC)

// Add the service port to the backend pool
pool.Add(p)
pool.Add(p, nil)

// Assert the proper health check was created
hc, _ := pool.healthChecker.Get(p.Port)
Expand All @@ -168,7 +168,7 @@ func TestBackendPoolUpdate(t *testing.T) {
namer := utils.Namer{}

p := ServicePort{Port: 3000, Protocol: utils.ProtocolHTTP}
pool.Add(p)
pool.Add(p, nil)
beName := namer.BeName(p.Port)

be, err := f.GetGlobalBackendService(beName)
Expand All @@ -188,7 +188,7 @@ func TestBackendPoolUpdate(t *testing.T) {

// Update service port to encrypted
p.Protocol = utils.ProtocolHTTPS
pool.Sync([]ServicePort{p})
pool.Sync([]ServicePort{p}, nil)

be, err = f.GetGlobalBackendService(beName)
if err != nil {
Expand All @@ -214,7 +214,7 @@ func TestBackendPoolChaosMonkey(t *testing.T) {
namer := utils.Namer{}

nodePort := ServicePort{Port: 8080, Protocol: utils.ProtocolHTTP}
pool.Add(nodePort)
pool.Add(nodePort, nil)
beName := namer.BeName(nodePort.Port)

be, _ := f.GetGlobalBackendService(beName)
Expand All @@ -227,7 +227,7 @@ func TestBackendPoolChaosMonkey(t *testing.T) {
f.calls = []int{}
f.UpdateGlobalBackendService(be)

pool.Add(nodePort)
pool.Add(nodePort, nil)
for _, call := range f.calls {
if call == utils.Create {
t.Fatalf("Unexpected create for existing backend service")
Expand Down Expand Up @@ -260,9 +260,9 @@ func TestBackendPoolSync(t *testing.T) {
f := NewFakeBackendServices(noOpErrFunc)
fakeIGs := instances.NewFakeInstanceGroups(sets.NewString())
pool, _ := newTestJig(f, fakeIGs, true)
pool.Add(ServicePort{Port: 81})
pool.Add(ServicePort{Port: 90})
if err := pool.Sync(svcNodePorts); err != nil {
pool.Add(ServicePort{Port: 81}, nil)
pool.Add(ServicePort{Port: 90}, nil)
if err := pool.Sync(svcNodePorts, nil); err != nil {
t.Errorf("Expected backend pool to sync, err: %v", err)
}
if err := pool.GC(svcNodePorts); err != nil {
Expand Down Expand Up @@ -361,7 +361,7 @@ func TestBackendPoolDeleteLegacyHealthChecks(t *testing.T) {
})

// Have pool sync the above backend service
bp.Add(ServicePort{Port: 80, Protocol: utils.ProtocolHTTPS})
bp.Add(ServicePort{Port: 80, Protocol: utils.ProtocolHTTPS}, nil)

// Verify the legacy health check has been deleted
_, err = hcp.GetHttpHealthCheck(beName)
Expand All @@ -388,7 +388,7 @@ func TestBackendPoolShutdown(t *testing.T) {
namer := utils.Namer{}

// Add a backend-service and verify that it doesn't exist after Shutdown()
pool.Add(ServicePort{Port: 80})
pool.Add(ServicePort{Port: 80}, nil)
pool.Shutdown()
if _, err := f.GetGlobalBackendService(namer.BeName(80)); err == nil {
t.Fatalf("%v", err)
Expand All @@ -402,7 +402,7 @@ func TestBackendInstanceGroupClobbering(t *testing.T) {
namer := utils.Namer{}

// This will add the instance group k8s-ig to the instance pool
pool.Add(ServicePort{Port: 80})
pool.Add(ServicePort{Port: 80}, nil)

be, err := f.GetGlobalBackendService(namer.BeName(80))
if err != nil {
Expand All @@ -420,7 +420,7 @@ func TestBackendInstanceGroupClobbering(t *testing.T) {
}

// Make sure repeated adds don't clobber the inserted instance group
pool.Add(ServicePort{Port: 80})
pool.Add(ServicePort{Port: 80}, nil)
be, err = f.GetGlobalBackendService(namer.BeName(80))
if err != nil {
t.Fatalf("%v", err)
Expand Down Expand Up @@ -462,7 +462,7 @@ func TestBackendCreateBalancingMode(t *testing.T) {
return nil
}

pool.Add(nodePort)
pool.Add(nodePort, nil)
be, err := f.GetGlobalBackendService(namer.BeName(nodePort.Port))
if err != nil {
t.Fatalf("%v", err)
Expand Down
4 changes: 2 additions & 2 deletions controllers/gce/backends/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ type probeProvider interface {
// as gce backendServices, and sync them through the BackendServices interface.
type BackendPool interface {
Init(p probeProvider)
Add(port ServicePort) error
Add(port ServicePort, igs []*compute.InstanceGroup) error
Get(port int64) (*compute.BackendService, error)
Delete(port int64) error
Sync(ports []ServicePort) error
Sync(ports []ServicePort, igs []*compute.InstanceGroup) error
GC(ports []ServicePort) error
Shutdown() error
Status(name string) string
Expand Down
58 changes: 40 additions & 18 deletions controllers/gce/controller/cluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/golang/glog"

compute "google.golang.org/api/compute/v1"
gce "k8s.io/kubernetes/pkg/cloudprovider/providers/gce"

"k8s.io/ingress/controllers/gce/backends"
Expand Down Expand Up @@ -108,41 +109,45 @@ func (c *ClusterManager) shutdown() error {
}

// Checkpoint performs a checkpoint with the cloud.
// - lbNames are the names of L7 loadbalancers we wish to exist. If they already
// - lbs are the single cluster L7 loadbalancers we wish to exist. If they already
// exist, they should not have any broken links between say, a UrlMap and
// TargetHttpProxy.
// - nodeNames are the names of nodes we wish to add to all loadbalancer
// instance groups.
// - nodePorts are the ports for which we require BackendServices. Each of
// these ports must also be opened on the corresponding Instance Group.
// - backendServicePorts are the ports for which we require BackendServices.
// - namedPorts are the ports which must be opened on instance groups.
// Returns the list of all instance groups corresponding to the given loadbalancers.
// If in performing the checkpoint the cluster manager runs out of quota, a
// googleapi 403 is returned.
func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeNames []string, nodePorts []backends.ServicePort) error {
func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeNames []string, backendServicePorts []backends.ServicePort, namedPorts []backends.ServicePort) ([]*compute.InstanceGroup, error) {
if len(namedPorts) != 0 {
// Add the default backend node port to the list of named ports for instance groups.
namedPorts = append(namedPorts, c.defaultBackendNodePort)
}
// Multiple ingress paths can point to the same service (and hence nodePort)
// but each nodePort can only have one set of cloud resources behind it. So
// don't waste time double validating GCE BackendServices.
portMap := map[int64]backends.ServicePort{}
for _, p := range nodePorts {
portMap[p.Port] = p
}
nodePorts = []backends.ServicePort{}
for _, sp := range portMap {
nodePorts = append(nodePorts, sp)
namedPorts = uniq(namedPorts)
backendServicePorts = uniq(backendServicePorts)
// Create Instance Groups.
igs, err := c.EnsureInstanceGroupsAndPorts(namedPorts)
if err != nil {
return igs, err
}
if err := c.backendPool.Sync(nodePorts); err != nil {
return err
if err := c.backendPool.Sync(backendServicePorts, igs); err != nil {
return igs, err
}
if err := c.instancePool.Sync(nodeNames); err != nil {
return err
return igs, err
}
if err := c.l7Pool.Sync(lbs); err != nil {
return err
return igs, err
}

// TODO: Manage default backend and its firewall rule in a centralized way.
// DefaultBackend is managed in l7 pool, which doesn't understand instances,
// which the firewall rule requires.
fwNodePorts := nodePorts
fwNodePorts := backendServicePorts
if len(lbs) != 0 {
// If there are no Ingresses, we shouldn't be allowing traffic to the
// default backend. Equally importantly if the cluster gets torn down
Expand All @@ -155,10 +160,27 @@ func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeName
np = append(np, p.Port)
}
if err := c.firewallPool.Sync(np, nodeNames); err != nil {
return err
return igs, err
}

return nil
return igs, nil
}

func (c *ClusterManager) EnsureInstanceGroupsAndPorts(servicePorts []backends.ServicePort) ([]*compute.InstanceGroup, error) {
var igs []*compute.InstanceGroup
var err error
for _, p := range servicePorts {
// EnsureInstanceGroupsAndPorts always returns all the instance groups, so we can return
// the output of any call, no need to append the return from all calls.
// TODO: Ideally, we want to call CreateInstaceGroups only the first time and
// then call AddNamedPort multiple times. Need to update the interface to
// achieve this.
igs, _, err = instances.EnsureInstanceGroupsAndPorts(c.instancePool, c.ClusterNamer, p.Port)
if err != nil {
return nil, err
}
}
return igs, nil
}

// GC garbage collects unused resources.
Expand Down
Loading