Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

K8s-NEG Integration #48

Merged
merged 7 commits into from
Oct 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ import (
"k8s.io/ingress-gce/pkg/context"
"k8s.io/ingress-gce/pkg/controller"
"k8s.io/ingress-gce/pkg/loadbalancers"
neg "k8s.io/ingress-gce/pkg/networkendpointgroup"
"k8s.io/ingress-gce/pkg/storage"
"k8s.io/ingress-gce/pkg/utils"

"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
)
Expand Down Expand Up @@ -288,11 +290,10 @@ func main() {
// Create fake cluster manager
clusterManager = controller.NewFakeClusterManager(*clusterName, controller.DefaultFirewallName).ClusterManager
}

ctx := context.NewControllerContext(kubeClient, *watchNamespace, *resyncPeriod, false)

enableNEG := cloud.AlphaFeatureGate.Enabled(gce.AlphaFeatureNetworkEndpointGroup)
ctx := context.NewControllerContext(kubeClient, *watchNamespace, *resyncPeriod, enableNEG)
// Start loadbalancer controller
lbc, err := controller.NewLoadBalancerController(kubeClient, ctx, clusterManager)
lbc, err := controller.NewLoadBalancerController(kubeClient, ctx, clusterManager, enableNEG)
if err != nil {
glog.Fatalf("%v", err)
}
Expand All @@ -301,6 +302,13 @@ func main() {
glog.V(3).Infof("Cluster name %+v", clusterManager.ClusterNamer.GetClusterName())
}
clusterManager.Init(&controller.GCETranslator{LoadBalancerController: lbc})

// Start NEG controller
if enableNEG {
negController, _ := neg.NewController(kubeClient, cloud, ctx, lbc.Translator, namer, *resyncPeriod)
go negController.Run(ctx.StopCh)
}

go registerHandlers(lbc)
go handleSigterm(lbc, *deleteAllOnQuit)

Expand Down
113 changes: 102 additions & 11 deletions pkg/backends/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/golang/glog"

computealpha "google.golang.org/api/compute/v0.alpha"
compute "google.golang.org/api/compute/v1"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -76,6 +77,7 @@ const maxRPS = 1
// Backends implements BackendPool.
type Backends struct {
cloud BackendServices
negGetter NEGGetter
nodePool instances.NodePool
healthChecker healthchecks.HealthChecker
snapshotter storage.Snapshotter
Expand All @@ -93,10 +95,14 @@ func portKey(port int64) string {

// ServicePort for tupling port and protocol
type ServicePort struct {
Port int64
Protocol utils.AppProtocol
SvcName types.NamespacedName
SvcPort intstr.IntOrString
// Port is the service node port
// TODO: rename it to NodePort
Port int64
Protocol utils.AppProtocol
SvcName types.NamespacedName
SvcPort intstr.IntOrString
SvcTargetPort string
NEGEnabled bool
}

// Description returns a string describing the ServicePort.
Expand All @@ -116,6 +122,7 @@ func (sp ServicePort) Description() string {
// - resyncWithCloud: if true, periodically syncs with cloud resources.
func NewBackendPool(
cloud BackendServices,
negGetter NEGGetter,
healthChecker healthchecks.HealthChecker,
nodePool instances.NodePool,
namer *utils.Namer,
Expand All @@ -128,6 +135,7 @@ func NewBackendPool(
}
backendPool := &Backends{
cloud: cloud,
negGetter: negGetter,
nodePool: nodePool,
healthChecker: healthChecker,
namer: namer,
Expand Down Expand Up @@ -171,8 +179,7 @@ func (b *Backends) Get(port int64) (*compute.BackendService, error) {
}

func (b *Backends) ensureHealthCheck(sp ServicePort) (string, error) {
hc := b.healthChecker.New(sp.Port, sp.Protocol)

hc := b.healthChecker.New(sp.Port, sp.Protocol, sp.NEGEnabled)
existingLegacyHC, err := b.healthChecker.GetLegacy(sp.Port)
if err != nil && !utils.IsNotFoundError(err) {
return "", err
Expand Down Expand Up @@ -271,7 +278,15 @@ func (b *Backends) ensureBackendService(p ServicePort, igs []*compute.InstanceGr
if len(be.HealthChecks) == 1 {
existingHCLink = be.HealthChecks[0]
}
if be.Protocol != string(p.Protocol) || existingHCLink != hcLink || be.Description != p.Description() {

// Compare health check name instead of health check link.
// This is because health check link contains api version.
// For NEG, the api version for health check will be alpha.
// Hence, it will cause the health check links to be always different
// TODO (mixia): compare health check link directly once NEG is GA
existingHCName := retrieveObjectName(existingHCLink)
expectedHCName := retrieveObjectName(hcLink)
if be.Protocol != string(p.Protocol) || existingHCName != expectedHCName || be.Description != p.Description() {
glog.V(2).Infof("Updating backend protocol %v (%v) for change in protocol (%v) or health check", beName, be.Protocol, string(p.Protocol))
be.Protocol = string(p.Protocol)
be.HealthChecks = []string{hcLink}
Expand All @@ -293,6 +308,10 @@ func (b *Backends) ensureBackendService(p ServicePort, igs []*compute.InstanceGr
return nil
}

// If NEG is enabled, do not link backend service to instance groups.
if p.NEGEnabled {
return nil
}
// Verify that backend service contains links to all backends/instance-groups
return b.edgeHop(be, igs)
}
Expand Down Expand Up @@ -352,6 +371,19 @@ func getBackendsForIGs(igs []*compute.InstanceGroup, bm BalancingMode) []*comput
return backends
}

func getBackendsForNEGs(negs []*computealpha.NetworkEndpointGroup) []*computealpha.Backend {
var backends []*computealpha.Backend
for _, neg := range negs {
b := &computealpha.Backend{
Group: neg.SelfLink,
BalancingMode: string(Rate),
MaxRatePerEndpoint: maxRPS,
}
backends = append(backends, b)
}
return backends
}

// edgeHop checks the links of the given backend by executing an edge hop.
// It fixes broken links.
func (b *Backends) edgeHop(be *compute.BackendService, igs []*compute.InstanceGroup) error {
Expand All @@ -369,7 +401,15 @@ func (b *Backends) edgeHop(be *compute.BackendService, igs []*compute.InstanceGr
glog.V(2).Infof("Updating backend service %v with %d backends: expected igs %+v, current igs %+v",
be.Name, igLinks.Len(), igLinks.List(), beIGs.List())

originalBackends := be.Backends
originalIGBackends := []*compute.Backend{}
for _, backend := range be.Backends {
// Backend service is not able to point to NEG and IG at the same time.
// Filter IG backends here.
if strings.Contains(backend.Group, "instanceGroups") {
originalIGBackends = append(originalIGBackends, backend)
}
}

var addIGs []*compute.InstanceGroup
for _, ig := range igs {
if !beIGs.Has(ig.SelfLink) {
Expand All @@ -387,7 +427,7 @@ func (b *Backends) edgeHop(be *compute.BackendService, igs []*compute.InstanceGr
for _, bm := range []BalancingMode{Rate, Utilization} {
// Generate backends with given instance groups with a specific mode
newBackends := getBackendsForIGs(addIGs, bm)
be.Backends = append(originalBackends, newBackends...)
be.Backends = append(originalIGBackends, newBackends...)

if err := b.cloud.UpdateGlobalBackendService(be); err != nil {
if utils.IsHTTPErrorCode(err, http.StatusBadRequest) {
Expand Down Expand Up @@ -457,6 +497,46 @@ func (b *Backends) Status(name string) string {
return hs.HealthStatus[0].HealthState
}

func (b *Backends) Link(port ServicePort, zones []string) error {
if !port.NEGEnabled {
return nil
}
negName := b.namer.NEGName(port.SvcName.Namespace, port.SvcName.Name, port.SvcTargetPort)
var negs []*computealpha.NetworkEndpointGroup
var err error
for _, zone := range zones {
neg, err := b.negGetter.GetNetworkEndpointGroup(negName, zone)
if err != nil {
return err
}
negs = append(negs, neg)
}

backendService, err := b.cloud.GetAlphaGlobalBackendService(b.namer.BeName(port.Port))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a note somewhere saying the services must still be type NodePort? May be ideal to have a neg-design.md file that documents behavior, caveats, and TODOs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a NEG to beta doc that included this.

if err != nil {
return err
}

targetBackends := getBackendsForNEGs(negs)
oldBackends := sets.NewString()
newBackends := sets.NewString()

// WARNING: the backend link includes api version.
// API versions has to match, otherwise backend link will be always different.
for _, be := range backendService.Backends {
oldBackends.Insert(be.Group)
}
for _, be := range targetBackends {
newBackends.Insert(be.Group)
}

if !oldBackends.Equal(newBackends) {
backendService.Backends = targetBackends
return b.cloud.UpdateAlphaGlobalBackendService(backendService)
}
return nil
}

func applyLegacyHCToHC(existing *compute.HttpHealthCheck, hc *healthchecks.HealthCheck) {
hc.Description = existing.Description
hc.CheckIntervalSec = existing.CheckIntervalSec
Expand All @@ -482,10 +562,21 @@ func applyProbeSettingsToHC(p *v1.Probe, hc *healthchecks.HealthCheck) {
break
}
}

hc.RequestPath = healthPath
hc.Host = host
hc.Description = "Kubernetes L7 health check generated with readiness probe settings."
hc.CheckIntervalSec = int64(p.PeriodSeconds) + int64(healthchecks.DefaultHealthCheckInterval.Seconds())
hc.TimeoutSec = int64(p.TimeoutSeconds)
if hc.ForNEG {
// For NEG mode, we can support more aggresive healthcheck interval.
hc.CheckIntervalSec = int64(p.PeriodSeconds)
} else {
// For IG mode, short healthcheck interval may health check flooding problem.
hc.CheckIntervalSec = int64(p.PeriodSeconds) + int64(healthchecks.DefaultHealthCheckInterval.Seconds())
}
}

//retrieveObjectName takes a GCE object link and return the last part of the url as object name
func retrieveObjectName(url string) string {
splited := strings.Split(url, "/")
return splited[len(splited)-1]
}
Loading