Skip to content

Commit

Permalink
Condense health checkers into one health checker for all backends.
Browse files Browse the repository at this point in the history
  • Loading branch information
rramkumar1 committed May 14, 2018
1 parent f6af412 commit 04b548e
Show file tree
Hide file tree
Showing 11 changed files with 134 additions and 118 deletions.
4 changes: 2 additions & 2 deletions cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,15 @@ func main() {

cloud := app.NewGCEClient()
defaultBackendServicePort := app.DefaultBackendServicePort(kubeClient)
clusterManager, err := controller.NewClusterManager(cloud, namer, flags.F.HealthCheckPath)
clusterManager, err := controller.NewClusterManager(cloud, namer, *defaultBackendServicePort, flags.F.HealthCheckPath, flags.F.DefaultSvcHealthCheckPath)
if err != nil {
glog.Fatalf("Error creating cluster manager: %v", err)
}

enableNEG := cloud.AlphaFeatureGate.Enabled(gce.AlphaFeatureNetworkEndpointGroup)
stopCh := make(chan struct{})
ctx := context.NewControllerContext(kubeClient, flags.F.WatchNamespace, flags.F.ResyncPeriod, enableNEG)
lbc, err := controller.NewLoadBalancerController(kubeClient, stopCh, ctx, clusterManager, enableNEG, *defaultBackendServicePort)
lbc, err := controller.NewLoadBalancerController(kubeClient, stopCh, ctx, clusterManager, enableNEG)
if err != nil {
glog.Fatalf("Error creating load balancer controller: %v", err)
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/backends/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,7 @@ func (b *Backends) Get(name string, isAlpha bool) (*BackendService, error) {
}

func (b *Backends) ensureHealthCheck(sp utils.ServicePort) (string, error) {
name := sp.BackendName(b.namer)
hc := b.healthChecker.New(name, sp.NodePort, sp.Protocol, sp.NEGEnabled)
hc := b.healthChecker.New(sp)
existingLegacyHC, err := b.healthChecker.GetLegacy(sp.NodePort)
if err != nil && !utils.IsNotFoundError(err) {
return "", err
Expand Down
11 changes: 6 additions & 5 deletions pkg/backends/backends_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ import (
const defaultZone = "zone-a"

var (
defaultNamer = utils.NewNamer("uid1", "fw1")
existingProbe = &api_v1.Probe{
defaultNamer = utils.NewNamer("uid1", "fw1")
defaultBackendSvc = types.NamespacedName{Namespace: "system", Name: "default"}
existingProbe = &api_v1.Probe{
Handler: api_v1.Handler{
HTTPGet: &api_v1.HTTPGetAction{
Scheme: api_v1.URISchemeHTTPS,
Expand All @@ -63,7 +64,7 @@ func newTestJig(f BackendServices, fakeIGs instances.InstanceGroups, syncWithClo
nodePool := instances.NewNodePool(fakeIGs, defaultNamer)
nodePool.Init(&instances.FakeZoneLister{Zones: []string{defaultZone}})
healthCheckProvider := healthchecks.NewFakeHealthCheckProvider()
healthChecks := healthchecks.NewHealthChecker(healthCheckProvider, "/", defaultNamer)
healthChecks := healthchecks.NewHealthChecker(healthCheckProvider, "/", "/healthz", defaultNamer, defaultBackendSvc)
bp := NewBackendPool(f, negGetter, healthChecks, nodePool, defaultNamer, syncWithCloud)
probes := map[utils.ServicePort]*api_v1.Probe{{NodePort: 443, Protocol: annotations.ProtocolHTTPS}: existingProbe}
bp.Init(NewFakeProbeProvider(probes))
Expand Down Expand Up @@ -602,7 +603,7 @@ func TestBackendPoolDeleteLegacyHealthChecks(t *testing.T) {
nodePool := instances.NewNodePool(fakeIGs, defaultNamer)
nodePool.Init(&instances.FakeZoneLister{Zones: []string{defaultZone}})
hcp := healthchecks.NewFakeHealthCheckProvider()
healthChecks := healthchecks.NewHealthChecker(hcp, "/", defaultNamer)
healthChecks := healthchecks.NewHealthChecker(hcp, "/", "/healthz", defaultNamer, defaultBackendSvc)
bp := NewBackendPool(f, negGetter, healthChecks, nodePool, defaultNamer, false)
probes := map[utils.ServicePort]*api_v1.Probe{}
bp.Init(NewFakeProbeProvider(probes))
Expand Down Expand Up @@ -783,7 +784,7 @@ func TestLinkBackendServiceToNEG(t *testing.T) {
nodePool := instances.NewNodePool(fakeIGs, defaultNamer)
nodePool.Init(&instances.FakeZoneLister{Zones: []string{defaultZone}})
hcp := healthchecks.NewFakeHealthCheckProvider()
healthChecks := healthchecks.NewHealthChecker(hcp, "/", defaultNamer)
healthChecks := healthchecks.NewHealthChecker(hcp, "/", "/healthz", defaultNamer, defaultBackendSvc)
bp := NewBackendPool(f, fakeNEG, healthChecks, nodePool, defaultNamer, false)

svcPort := utils.ServicePort{
Expand Down
31 changes: 12 additions & 19 deletions pkg/controller/cluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,6 @@ import (
"k8s.io/ingress-gce/pkg/utils"
)

const (
defaultPort = 80
defaultHealthCheckPath = "/"
)

// ClusterManager manages cluster resource pools.
type ClusterManager struct {
ClusterNamer *utils.Namer
Expand All @@ -47,12 +42,10 @@ type ClusterManager struct {
firewallPool firewalls.SingleFirewallPool

// TODO: Refactor so we simply init a health check pool.
// Currently health checks are tied to backends because each backend needs
// the link of the associated health, but both the backend pool and
// loadbalancer pool manage backends, because the lifetime of the default
// backend is tied to the last/first loadbalancer not the life of the
// nodeport service or Ingress.
healthCheckers []healthchecks.HealthChecker
healthChecker healthchecks.HealthChecker

// defaultBackendSvcPort is the ServicePort for the system default backend.
defaultBackendSvcPort utils.ServicePort
}

// Init initializes the cluster manager.
Expand Down Expand Up @@ -179,25 +172,25 @@ func (c *ClusterManager) GC(lbNames []string, nodePorts []utils.ServicePort) err
// - namer: is the namer used to tag cluster wide shared resources.
// - defaultBackendNodePort: is the node port of glbc's default backend. This is
// the kubernetes Service that serves the 404 page if no urls match.
// - defaultHealthCheckPath: is the default path used for L7 health checks, eg: "/healthz".
// - healthCheckPath: is the default path used for L7 health checks, eg: "/healthz".
// - defaultBackendHealthCheckPath: is the default path used for the default backend health checks.
func NewClusterManager(
cloud *gce.GCECloud,
namer *utils.Namer,
defaultHealthCheckPath string) (*ClusterManager, error) {
defaultBackendSvcPort utils.ServicePort,
healthCheckPath string,
defaultBackendHealthCheckPath string) (*ClusterManager, error) {

// Names are fundamental to the cluster, the uid allocator makes sure names don't collide.
cluster := ClusterManager{ClusterNamer: namer}
cluster := ClusterManager{ClusterNamer: namer, defaultBackendSvcPort: defaultBackendSvcPort}

// NodePool stores GCE vms that are in this Kubernetes cluster.
cluster.instancePool = instances.NewNodePool(cloud, namer)

// BackendPool creates GCE BackendServices and associated health checks.
healthChecker := healthchecks.NewHealthChecker(cloud, defaultHealthCheckPath, cluster.ClusterNamer)
// Loadbalancer pool manages the default backend and its health check.
defaultBackendHealthChecker := healthchecks.NewHealthChecker(cloud, "/healthz", cluster.ClusterNamer)
cluster.healthChecker = healthchecks.NewHealthChecker(cloud, healthCheckPath, defaultBackendHealthCheckPath, cluster.ClusterNamer, defaultBackendSvcPort.SvcName)
cluster.backendPool = backends.NewBackendPool(cloud, cloud, cluster.healthChecker, cluster.instancePool, cluster.ClusterNamer, true)

cluster.healthCheckers = []healthchecks.HealthChecker{healthChecker, defaultBackendHealthChecker}
cluster.backendPool = backends.NewBackendPool(cloud, cloud, healthChecker, cluster.instancePool, cluster.ClusterNamer, true)
// L7 pool creates targetHTTPProxy, ForwardingRules, UrlMaps, StaticIPs.
cluster.l7Pool = loadbalancers.NewLoadBalancerPool(cloud, cluster.ClusterNamer)
cluster.firewallPool = firewalls.NewFirewallPool(cloud, cluster.ClusterNamer, gce.LoadBalancerSrcRanges(), flags.F.NodePortRanges.Values())
Expand Down
27 changes: 12 additions & 15 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,32 +84,29 @@ type LoadBalancerController struct {
hasSynced func() bool
// negEnabled indicates whether NEG feature is enabled.
negEnabled bool
// defaultBackendSvcPort is the ServicePort for the system default backend.
defaultBackendSvcPort utils.ServicePort
}

// NewLoadBalancerController creates a controller for gce loadbalancers.
// - kubeClient: A kubernetes REST client.
// - clusterManager: A ClusterManager capable of creating all cloud resources
// required for L7 loadbalancing.
// - resyncPeriod: Watchers relist from the Kubernetes API server this often.
func NewLoadBalancerController(kubeClient kubernetes.Interface, stopCh chan struct{}, ctx *context.ControllerContext, clusterManager *ClusterManager, negEnabled bool, defaultBackendSvcPort utils.ServicePort) (*LoadBalancerController, error) {
func NewLoadBalancerController(kubeClient kubernetes.Interface, stopCh chan struct{}, ctx *context.ControllerContext, clusterManager *ClusterManager, negEnabled bool) (*LoadBalancerController, error) {
broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(glog.Infof)
broadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{
Interface: kubeClient.Core().Events(""),
})
lbc := LoadBalancerController{
client: kubeClient,
ctx: ctx,
ingLister: StoreToIngressLister{Store: ctx.IngressInformer.GetStore()},
nodeLister: ctx.NodeInformer.GetIndexer(),
nodes: NewNodeController(ctx, clusterManager),
CloudClusterManager: clusterManager,
stopCh: stopCh,
hasSynced: ctx.HasSynced,
negEnabled: negEnabled,
defaultBackendSvcPort: defaultBackendSvcPort,
client: kubeClient,
ctx: ctx,
ingLister: StoreToIngressLister{Store: ctx.IngressInformer.GetStore()},
nodeLister: ctx.NodeInformer.GetIndexer(),
nodes: NewNodeController(ctx, clusterManager),
CloudClusterManager: clusterManager,
stopCh: stopCh,
hasSynced: ctx.HasSynced,
negEnabled: negEnabled,
}
lbc.ingQueue = utils.NewPeriodicTaskQueue("ingresses", lbc.sync)

Expand Down Expand Up @@ -286,7 +283,7 @@ func (lbc *LoadBalancerController) sync(key string) (retErr error) {
}

func (lbc *LoadBalancerController) ensureIngress(key string, ing *extensions.Ingress, nodeNames []string, gceSvcPorts []utils.ServicePort) error {
urlMap := lbc.Translator.TranslateIngress(ing, lbc.defaultBackendSvcPort)
urlMap := lbc.Translator.TranslateIngress(ing, lbc.CloudClusterManager.defaultBackendSvcPort)
ingSvcPorts := urlMap.AllServicePorts()
igs, err := lbc.CloudClusterManager.EnsureInstanceGroupsAndPorts(nodeNames, ingSvcPorts)
if err != nil {
Expand Down Expand Up @@ -448,7 +445,7 @@ func updateAnnotations(client kubernetes.Interface, name, namespace string, anno
func (lbc *LoadBalancerController) ToSvcPorts(ings *extensions.IngressList) []utils.ServicePort {
var knownPorts []utils.ServicePort
for _, ing := range ings.Items {
urlMap := lbc.Translator.TranslateIngress(&ing, lbc.defaultBackendSvcPort)
urlMap := lbc.Translator.TranslateIngress(&ing, lbc.CloudClusterManager.defaultBackendSvcPort)
knownPorts = append(knownPorts, urlMap.AllServicePorts()...)
}
return knownPorts
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func newLoadBalancerController(t *testing.T, cm *fakeClusterManager) *LoadBalanc
kubeClient := fake.NewSimpleClientset()
stopCh := make(chan struct{})
ctx := context.NewControllerContext(kubeClient, api_v1.NamespaceAll, 1*time.Second, true)
lb, err := NewLoadBalancerController(kubeClient, stopCh, ctx, cm.ClusterManager, true, testDefaultBeNodePort)
lb, err := NewLoadBalancerController(kubeClient, stopCh, ctx, cm.ClusterManager, true)
if err != nil {
t.Fatalf("%v", err)
}
Expand Down Expand Up @@ -155,7 +155,7 @@ func gceURLMapFromPrimitive(primitiveMap utils.PrimitivePathMap, pm *nodePortMan
}
urlMap.PutPathRulesForHost(hostname, pathRules)
}
urlMap.DefaultBackend = testDefaultBeNodePort
urlMap.DefaultBackend = testDefaultBeSvcPort
return urlMap
}

Expand Down
26 changes: 16 additions & 10 deletions pkg/controller/fakes.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controller

import (
compute "google.golang.org/api/compute/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"

Expand All @@ -32,10 +33,14 @@ import (
)

var (
testDefaultBeNodePort = utils.ServicePort{NodePort: 30000, Protocol: annotations.ProtocolHTTP}
testBackendPort = intstr.IntOrString{Type: intstr.Int, IntVal: 80}
testSrcRanges = []string{"1.1.1.1/20"}
testNodePortRanges = []string{"30000-32767"}
testDefaultBeSvcPort = utils.ServicePort{
NodePort: 30000,
Protocol: annotations.ProtocolHTTP,
SvcName: types.NamespacedName{Namespace: "system", Name: "default"},
}
testBackendPort = intstr.IntOrString{Type: intstr.Int, IntVal: 80}
testSrcRanges = []string{"1.1.1.1/20"}
testNodePortRanges = []string{"30000-32767"}
)

// ClusterManager fake
Expand All @@ -59,7 +64,7 @@ func NewFakeClusterManager(clusterName, firewallName string) *fakeClusterManager
nodePool := instances.NewNodePool(fakeIGs, namer)
nodePool.Init(&instances.FakeZoneLister{Zones: []string{"zone-a"}})

healthChecker := healthchecks.NewHealthChecker(fakeHCP, "/", namer)
healthChecker := healthchecks.NewHealthChecker(fakeHCP, "/", "/healthz", namer, testDefaultBeSvcPort.SvcName)

backendPool := backends.NewBackendPool(
fakeBackends,
Expand All @@ -68,11 +73,12 @@ func NewFakeClusterManager(clusterName, firewallName string) *fakeClusterManager
l7Pool := loadbalancers.NewLoadBalancerPool(fakeLbs, namer)
frPool := firewalls.NewFirewallPool(firewalls.NewFakeFirewallsProvider(false, false), namer, testSrcRanges, testNodePortRanges)
cm := &ClusterManager{
ClusterNamer: namer,
instancePool: nodePool,
backendPool: backendPool,
l7Pool: l7Pool,
firewallPool: frPool,
ClusterNamer: namer,
instancePool: nodePool,
backendPool: backendPool,
l7Pool: l7Pool,
firewallPool: frPool,
defaultBackendSvcPort: testDefaultBeSvcPort,
}
return &fakeClusterManager{cm, fakeLbs, fakeBackends, fakeIGs, namer}
}
40 changes: 22 additions & 18 deletions pkg/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,24 +38,25 @@ const (
var (
// F are global flags for the controller.
F = struct {
APIServerHost string
ClusterName string
ConfigFilePath string
DefaultSvc string
DeleteAllOnQuit bool
GCERateLimit RateLimitSpecs
HealthCheckPath string
HealthzPort int
Features *Features
InCluster bool
IngressClass string
KubeConfigFile string
ResyncPeriod time.Duration
Verbose bool
Version bool
WatchNamespace string
NodePortRanges PortRanges
EnableBackendConfig bool
APIServerHost string
ClusterName string
ConfigFilePath string
DefaultSvcHealthCheckPath string
DefaultSvc string
DeleteAllOnQuit bool
GCERateLimit RateLimitSpecs
HealthCheckPath string
HealthzPort int
Features *Features
InCluster bool
IngressClass string
KubeConfigFile string
ResyncPeriod time.Duration
Verbose bool
Version bool
WatchNamespace string
NodePortRanges PortRanges
EnableBackendConfig bool
}{}
)

Expand Down Expand Up @@ -97,6 +98,9 @@ resources.`)
flag.StringVar(&F.ConfigFilePath, "config-file-path", "",
`Path to a file containing the gce config. If left unspecified this
controller only works with default zones.`)
flag.StringVar(&F.DefaultSvcHealthCheckPath, "default-backend-health-check-path", "/healthz",
`Path used to health-check a default backend service. The default backend service
must serve a 200 page on this path.`)
flag.StringVar(&F.DefaultSvc, "default-backend-service", "kube-system/default-http-backend",
`Service used to serve a 404 page for the default backend. Takes the
form namespace/name. The controller uses the first node port of this Service for
Expand Down
Loading

0 comments on commit 04b548e

Please sign in to comment.