Skip to content

Commit

Permalink
Merge pull request #242 from rramkumar1/default-backend-cleanup
Browse files Browse the repository at this point in the history
Condense backendPool and defaultBackendPool
  • Loading branch information
nicksardo committed May 10, 2018
2 parents a0023fb + f368eb5 commit 8a842a9
Show file tree
Hide file tree
Showing 11 changed files with 207 additions and 219 deletions.
4 changes: 2 additions & 2 deletions cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,15 @@ func main() {

cloud := app.NewGCEClient()
defaultBackendServicePort := app.DefaultBackendServicePort(kubeClient)
clusterManager, err := controller.NewClusterManager(cloud, namer, *defaultBackendServicePort, flags.F.HealthCheckPath)
clusterManager, err := controller.NewClusterManager(cloud, namer, flags.F.HealthCheckPath)
if err != nil {
glog.Fatalf("Error creating cluster manager: %v", err)
}

enableNEG := cloud.AlphaFeatureGate.Enabled(gce.AlphaFeatureNetworkEndpointGroup)
stopCh := make(chan struct{})
ctx := context.NewControllerContext(kubeClient, flags.F.WatchNamespace, flags.F.ResyncPeriod, enableNEG)
lbc, err := controller.NewLoadBalancerController(kubeClient, stopCh, ctx, clusterManager, enableNEG)
lbc, err := controller.NewLoadBalancerController(kubeClient, stopCh, ctx, clusterManager, enableNEG, *defaultBackendServicePort)
if err != nil {
glog.Fatalf("Error creating load balancer controller: %v", err)
}
Expand Down
28 changes: 2 additions & 26 deletions pkg/backends/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,7 @@ type Backends struct {
healthChecker healthchecks.HealthChecker
snapshotter storage.Snapshotter
prober ProbeProvider
// ignoredPorts are a set of ports excluded from GC, even
// after the Ingress has been deleted. Note that invoking
// a Delete() on these ports will still delete the backend.
ignoredPorts sets.String
namer *utils.Namer
namer *utils.Namer
}

// BackendService embeds both the GA and alpha compute BackendService types
Expand Down Expand Up @@ -195,20 +191,14 @@ func NewBackendPool(
healthChecker healthchecks.HealthChecker,
nodePool instances.NodePool,
namer *utils.Namer,
ignorePorts []int64,
resyncWithCloud bool) *Backends {

ignored := []string{}
for _, p := range ignorePorts {
ignored = append(ignored, portKey(p))
}
backendPool := &Backends{
cloud: cloud,
negGetter: negGetter,
nodePool: nodePool,
healthChecker: healthChecker,
namer: namer,
ignoredPorts: sets.NewString(ignored...),
}
if !resyncWithCloud {
backendPool.snapshotter = storage.NewInMemoryPool()
Expand Down Expand Up @@ -319,20 +309,6 @@ func (b *Backends) create(namedPort *compute.NamedPort, hcLink string, sp utils.
// Uses the given instance groups if non-nil, else creates instance groups.
func (b *Backends) Ensure(svcPorts []utils.ServicePort, igs []*compute.InstanceGroup) error {
glog.V(3).Infof("Sync: backends %v", svcPorts)
// Ideally callers should pass the instance groups to prevent recomputing them here.
// Igs can be nil in scenarios where we do not have instance groups such as
// while syncing default backend service.
if igs == nil {
ports := []int64{}
for _, p := range svcPorts {
ports = append(ports, p.NodePort)
}
var err error
igs, err = instances.EnsureInstanceGroupsAndPorts(b.nodePool, b.namer, ports)
if err != nil {
return err
}
}
// create backends for new ports, perform an edge hop for existing ports
for _, port := range svcPorts {
if err := b.ensureBackendService(port, igs); err != nil {
Expand Down Expand Up @@ -608,7 +584,7 @@ func (b *Backends) GC(svcNodePorts []utils.ServicePort) error {
return err
}
nodePort := int64(p)
if knownPorts.Has(portKey(nodePort)) || b.ignoredPorts.Has(portKey(nodePort)) {
if knownPorts.Has(portKey(nodePort)) {
continue
}
glog.V(3).Infof("GCing backend for port %v", p)
Expand Down
34 changes: 23 additions & 11 deletions pkg/backends/backends_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func newTestJig(f BackendServices, fakeIGs instances.InstanceGroups, syncWithClo
nodePool.Init(&instances.FakeZoneLister{Zones: []string{defaultZone}})
healthCheckProvider := healthchecks.NewFakeHealthCheckProvider()
healthChecks := healthchecks.NewHealthChecker(healthCheckProvider, "/", defaultNamer)
bp := NewBackendPool(f, negGetter, healthChecks, nodePool, defaultNamer, []int64{}, syncWithCloud)
bp := NewBackendPool(f, negGetter, healthChecks, nodePool, defaultNamer, syncWithCloud)
probes := map[utils.ServicePort]*api_v1.Probe{{NodePort: 443, Protocol: annotations.ProtocolHTTPS}: existingProbe}
bp.Init(NewFakeProbeProvider(probes))

Expand All @@ -85,12 +85,16 @@ func TestBackendPoolAdd(t *testing.T) {
for _, sp := range testCases {
// For simplicity, these tests use 80/443 as nodeports
t.Run(fmt.Sprintf("Port:%v Protocol:%v", sp.NodePort, sp.Protocol), func(t *testing.T) {
igs, err := pool.nodePool.EnsureInstanceGroupsAndPorts(defaultNamer.InstanceGroup(), []int64{sp.NodePort})
if err != nil {
t.Fatalf("Did not expect error when ensuring IG for ServicePort %+v: %v", sp, err)
}
// Add a backend for a port, then re-add the same port and
// make sure it corrects a broken link from the backend to
// the instance group.
err := pool.Ensure([]utils.ServicePort{sp}, nil)
err = pool.Ensure([]utils.ServicePort{sp}, igs)
if err != nil {
t.Fatalf("Did not find expect error when adding a nodeport: %v, err: %v", sp, err)
t.Fatalf("Did not expect error when ensuring a ServicePort %+v: %v", sp, err)
}
beName := defaultNamer.Backend(sp.NodePort)

Expand All @@ -105,6 +109,9 @@ func TestBackendPoolAdd(t *testing.T) {

// Check that the instance group has the new port.
ig, err := fakeIGs.GetInstanceGroup(defaultNamer.InstanceGroup(), defaultZone)
if err != nil {
t.Fatalf("Did not expect error when getting IG's: %v", err)
}
var found bool
for _, port := range ig.NamedPorts {
if port.Port == sp.NodePort {
Expand Down Expand Up @@ -306,7 +313,8 @@ func TestBackendPoolChaosMonkey(t *testing.T) {
pool, _ := newTestJig(f, fakeIGs, false)

sp := utils.ServicePort{NodePort: 8080, Protocol: annotations.ProtocolHTTP}
pool.Ensure([]utils.ServicePort{sp}, nil)
igs, _ := pool.nodePool.EnsureInstanceGroupsAndPorts(defaultNamer.InstanceGroup(), []int64{sp.NodePort})
pool.Ensure([]utils.ServicePort{sp}, igs)
beName := defaultNamer.Backend(sp.NodePort)

be, _ := f.GetGlobalBackendService(beName)
Expand All @@ -319,7 +327,8 @@ func TestBackendPoolChaosMonkey(t *testing.T) {
f.calls = []int{}
f.UpdateGlobalBackendService(be)

pool.Ensure([]utils.ServicePort{sp}, nil)
igs, _ = pool.nodePool.EnsureInstanceGroupsAndPorts(defaultNamer.InstanceGroup(), []int64{sp.NodePort})
pool.Ensure([]utils.ServicePort{sp}, igs)
for _, call := range f.calls {
if call == utils.Create {
t.Fatalf("Unexpected create for existing backend service")
Expand Down Expand Up @@ -426,7 +435,7 @@ func TestBackendPoolDeleteLegacyHealthChecks(t *testing.T) {
nodePool.Init(&instances.FakeZoneLister{Zones: []string{defaultZone}})
hcp := healthchecks.NewFakeHealthCheckProvider()
healthChecks := healthchecks.NewHealthChecker(hcp, "/", defaultNamer)
bp := NewBackendPool(f, negGetter, healthChecks, nodePool, defaultNamer, []int64{}, false)
bp := NewBackendPool(f, negGetter, healthChecks, nodePool, defaultNamer, false)
probes := map[utils.ServicePort]*api_v1.Probe{}
bp.Init(NewFakeProbeProvider(probes))

Expand Down Expand Up @@ -490,8 +499,9 @@ func TestBackendInstanceGroupClobbering(t *testing.T) {
fakeIGs := instances.NewFakeInstanceGroups(sets.NewString(), defaultNamer)
pool, _ := newTestJig(f, fakeIGs, false)

// This will add the instance group k8s-ig to the instance pool
pool.Ensure([]utils.ServicePort{{NodePort: 80}}, nil)
sp := utils.ServicePort{NodePort: 80}
igs, _ := pool.nodePool.EnsureInstanceGroupsAndPorts(defaultNamer.InstanceGroup(), []int64{sp.NodePort})
pool.Ensure([]utils.ServicePort{sp}, igs)

be, err := f.GetGlobalBackendService(defaultNamer.Backend(80))
if err != nil {
Expand All @@ -509,7 +519,8 @@ func TestBackendInstanceGroupClobbering(t *testing.T) {
}

// Make sure repeated adds don't clobber the inserted instance group
pool.Ensure([]utils.ServicePort{{NodePort: 80}}, nil)
igs, _ = pool.nodePool.EnsureInstanceGroupsAndPorts(defaultNamer.InstanceGroup(), []int64{sp.NodePort})
pool.Ensure([]utils.ServicePort{sp}, igs)
be, err = f.GetGlobalBackendService(defaultNamer.Backend(80))
if err != nil {
t.Fatalf("%v", err)
Expand Down Expand Up @@ -549,7 +560,8 @@ func TestBackendCreateBalancingMode(t *testing.T) {
return nil
}

pool.Ensure([]utils.ServicePort{sp}, nil)
igs, _ := pool.nodePool.EnsureInstanceGroupsAndPorts(defaultNamer.InstanceGroup(), []int64{sp.NodePort})
pool.Ensure([]utils.ServicePort{sp}, igs)
be, err := f.GetGlobalBackendService(defaultNamer.Backend(sp.NodePort))
if err != nil {
t.Fatalf("%v", err)
Expand Down Expand Up @@ -604,7 +616,7 @@ func TestLinkBackendServiceToNEG(t *testing.T) {
nodePool.Init(&instances.FakeZoneLister{Zones: []string{defaultZone}})
hcp := healthchecks.NewFakeHealthCheckProvider()
healthChecks := healthchecks.NewHealthChecker(hcp, "/", defaultNamer)
bp := NewBackendPool(f, fakeNEG, healthChecks, nodePool, defaultNamer, []int64{}, false)
bp := NewBackendPool(f, fakeNEG, healthChecks, nodePool, defaultNamer, false)

svcPort := utils.ServicePort{
NodePort: 30001,
Expand Down
23 changes: 7 additions & 16 deletions pkg/controller/cluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,11 @@ const (

// ClusterManager manages cluster resource pools.
type ClusterManager struct {
ClusterNamer *utils.Namer
defaultBackendNodePort utils.ServicePort
instancePool instances.NodePool
backendPool backends.BackendPool
l7Pool loadbalancers.LoadBalancerPool
firewallPool firewalls.SingleFirewallPool
ClusterNamer *utils.Namer
instancePool instances.NodePool
backendPool backends.BackendPool
l7Pool loadbalancers.LoadBalancerPool
firewallPool firewalls.SingleFirewallPool

// TODO: Refactor so we simply init a health check pool.
// Currently health checks are tied to backends because each backend needs
Expand Down Expand Up @@ -110,11 +109,6 @@ func (c *ClusterManager) EnsureLoadBalancer(lb *loadbalancers.L7RuntimeInfo, lbS
}

func (c *ClusterManager) EnsureInstanceGroupsAndPorts(nodeNames []string, servicePorts []utils.ServicePort) ([]*compute.InstanceGroup, error) {
if len(servicePorts) != 0 {
// Add the default backend node port to the list of named ports for instance groups.
servicePorts = append(servicePorts, c.defaultBackendNodePort)
}

// Convert to slice of NodePort int64s.
ports := []int64{}
for _, p := range uniq(servicePorts) {
Expand Down Expand Up @@ -187,7 +181,6 @@ func (c *ClusterManager) GC(lbNames []string, nodePorts []utils.ServicePort) err
func NewClusterManager(
cloud *gce.GCECloud,
namer *utils.Namer,
defaultBackendNodePort utils.ServicePort,
defaultHealthCheckPath string) (*ClusterManager, error) {

// Names are fundamental to the cluster, the uid allocator makes sure names don't collide.
Expand All @@ -204,12 +197,10 @@ func NewClusterManager(
cluster.healthCheckers = []healthchecks.HealthChecker{healthChecker, defaultBackendHealthChecker}

// TODO: This needs to change to a consolidated management of the default backend.
cluster.backendPool = backends.NewBackendPool(cloud, cloud, healthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{defaultBackendNodePort.NodePort}, true)
defaultBackendPool := backends.NewBackendPool(cloud, cloud, defaultBackendHealthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{}, false)
cluster.defaultBackendNodePort = defaultBackendNodePort
cluster.backendPool = backends.NewBackendPool(cloud, cloud, healthChecker, cluster.instancePool, cluster.ClusterNamer, true)

// L7 pool creates targetHTTPProxy, ForwardingRules, UrlMaps, StaticIPs.
cluster.l7Pool = loadbalancers.NewLoadBalancerPool(cloud, defaultBackendPool, defaultBackendNodePort, cluster.ClusterNamer)
cluster.l7Pool = loadbalancers.NewLoadBalancerPool(cloud, cluster.ClusterNamer)
cluster.firewallPool = firewalls.NewFirewallPool(cloud, cluster.ClusterNamer, gce.LoadBalancerSrcRanges(), flags.F.NodePortRanges.Values())
return &cluster, nil
}
30 changes: 17 additions & 13 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,29 +84,32 @@ type LoadBalancerController struct {
hasSynced func() bool
// negEnabled indicates whether NEG feature is enabled.
negEnabled bool
// defaultBackendSvcPort is the ServicePort for the system default backend.
defaultBackendSvcPort utils.ServicePort
}

// NewLoadBalancerController creates a controller for gce loadbalancers.
// - kubeClient: A kubernetes REST client.
// - clusterManager: A ClusterManager capable of creating all cloud resources
// required for L7 loadbalancing.
// - resyncPeriod: Watchers relist from the Kubernetes API server this often.
func NewLoadBalancerController(kubeClient kubernetes.Interface, stopCh chan struct{}, ctx *context.ControllerContext, clusterManager *ClusterManager, negEnabled bool) (*LoadBalancerController, error) {
func NewLoadBalancerController(kubeClient kubernetes.Interface, stopCh chan struct{}, ctx *context.ControllerContext, clusterManager *ClusterManager, negEnabled bool, defaultBackendSvcPort utils.ServicePort) (*LoadBalancerController, error) {
broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(glog.Infof)
broadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{
Interface: kubeClient.Core().Events(""),
})
lbc := LoadBalancerController{
client: kubeClient,
ctx: ctx,
ingLister: StoreToIngressLister{Store: ctx.IngressInformer.GetStore()},
nodeLister: ctx.NodeInformer.GetIndexer(),
nodes: NewNodeController(ctx, clusterManager),
CloudClusterManager: clusterManager,
stopCh: stopCh,
hasSynced: ctx.HasSynced,
negEnabled: negEnabled,
client: kubeClient,
ctx: ctx,
ingLister: StoreToIngressLister{Store: ctx.IngressInformer.GetStore()},
nodeLister: ctx.NodeInformer.GetIndexer(),
nodes: NewNodeController(ctx, clusterManager),
CloudClusterManager: clusterManager,
stopCh: stopCh,
hasSynced: ctx.HasSynced,
negEnabled: negEnabled,
defaultBackendSvcPort: defaultBackendSvcPort,
}
lbc.ingQueue = utils.NewPeriodicTaskQueue("ingresses", lbc.sync)

Expand Down Expand Up @@ -284,7 +287,7 @@ func (lbc *LoadBalancerController) sync(key string) (retErr error) {
}

func (lbc *LoadBalancerController) ensureIngress(key string, ing *extensions.Ingress, nodeNames []string, gceSvcPorts []utils.ServicePort) error {
urlMap := lbc.Translator.TranslateIngress(ing)
urlMap := lbc.Translator.TranslateIngress(ing, lbc.defaultBackendSvcPort)
ingSvcPorts := urlMap.AllServicePorts()
igs, err := lbc.CloudClusterManager.EnsureInstanceGroupsAndPorts(nodeNames, ingSvcPorts)
if err != nil {
Expand All @@ -310,6 +313,7 @@ func (lbc *LoadBalancerController) ensureIngress(key string, ing *extensions.Ing
if err != nil {
return err
}
lb.UrlMap = urlMap

// Create the backend services and higher-level LB resources.
if err = lbc.CloudClusterManager.EnsureLoadBalancer(lb, ingSvcPorts, igs); err != nil {
Expand Down Expand Up @@ -348,7 +352,7 @@ func (lbc *LoadBalancerController) ensureIngress(key string, ing *extensions.Ing
return fmt.Errorf("unable to get loadbalancer: %v", err)
}

if err := l7.UpdateUrlMap(urlMap); err != nil {
if err := l7.UpdateUrlMap(); err != nil {
return fmt.Errorf("update URL Map error: %v", err)
}

Expand Down Expand Up @@ -445,7 +449,7 @@ func updateAnnotations(client kubernetes.Interface, name, namespace string, anno
func (lbc *LoadBalancerController) ToSvcPorts(ings *extensions.IngressList) []utils.ServicePort {
var knownPorts []utils.ServicePort
for _, ing := range ings.Items {
urlMap := lbc.Translator.TranslateIngress(&ing)
urlMap := lbc.Translator.TranslateIngress(&ing, lbc.defaultBackendSvcPort)
knownPorts = append(knownPorts, urlMap.AllServicePorts()...)
}
return knownPorts
Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func newLoadBalancerController(t *testing.T, cm *fakeClusterManager) *LoadBalanc
kubeClient := fake.NewSimpleClientset()
stopCh := make(chan struct{})
ctx := context.NewControllerContext(kubeClient, api_v1.NamespaceAll, 1*time.Second, true)
lb, err := NewLoadBalancerController(kubeClient, stopCh, ctx, cm.ClusterManager, true)
lb, err := NewLoadBalancerController(kubeClient, stopCh, ctx, cm.ClusterManager, true, testDefaultBeNodePort)
if err != nil {
t.Fatalf("%v", err)
}
Expand Down Expand Up @@ -155,6 +155,7 @@ func gceURLMapFromPrimitive(primitiveMap utils.PrimitivePathMap, pm *nodePortMan
}
urlMap.PutPathRulesForHost(hostname, pathRules)
}
urlMap.DefaultBackend = testDefaultBeNodePort
return urlMap
}

Expand Down Expand Up @@ -326,14 +327,14 @@ func TestLbFaultyUpdate(t *testing.T) {
t.Fatalf("cm.fakeLbs.CheckURLMap(...) = %v, want nil", err)
}

// Change the urlmap directly through the lb pool, resync, and
// Change the urlmap directly, resync, and
// make sure the controller corrects it.
forcedUpdate := gceURLMapFromPrimitive(utils.PrimitivePathMap{
l7.RuntimeInfo().UrlMap = gceURLMapFromPrimitive(utils.PrimitivePathMap{
"foo.example.com": {
"/foo1": "foo2svc",
},
}, pm)
l7.UpdateUrlMap(forcedUpdate)
l7.UpdateUrlMap()

if err := lbc.sync(ingStoreKey); err != nil {
t.Fatalf("lbc.sync() = err %v", err)
Expand Down
10 changes: 2 additions & 8 deletions pkg/controller/fakes.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,8 @@ func NewFakeClusterManager(clusterName, firewallName string) *fakeClusterManager
backendPool := backends.NewBackendPool(
fakeBackends,
fakeNEG,
healthChecker, nodePool, namer, []int64{}, false)
l7Pool := loadbalancers.NewLoadBalancerPool(
fakeLbs,
// TODO: change this
backendPool,
testDefaultBeNodePort,
namer,
)
healthChecker, nodePool, namer, false)
l7Pool := loadbalancers.NewLoadBalancerPool(fakeLbs, namer)
frPool := firewalls.NewFirewallPool(firewalls.NewFakeFirewallsProvider(false, false), namer, testSrcRanges, testNodePortRanges)
cm := &ClusterManager{
ClusterNamer: namer,
Expand Down
Loading

0 comments on commit 8a842a9

Please sign in to comment.