Skip to content

Commit

Permalink
Merge pull request #428 from rramkumar1/syncer-interface
Browse files Browse the repository at this point in the history
Introduce a new interface to encapsulate Ingress sync and controller implementation of the sync
  • Loading branch information
k8s-ci-robot committed Aug 23, 2018
2 parents 439421d + 6f8e6d6 commit 7adac05
Show file tree
Hide file tree
Showing 4 changed files with 238 additions and 118 deletions.
256 changes: 139 additions & 117 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"time"

"github.com/golang/glog"
compute "google.golang.org/api/compute/v1"

apiv1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
Expand All @@ -44,10 +43,18 @@ import (
"k8s.io/ingress-gce/pkg/context"
"k8s.io/ingress-gce/pkg/controller/translator"
"k8s.io/ingress-gce/pkg/loadbalancers"
ingsync "k8s.io/ingress-gce/pkg/sync"
"k8s.io/ingress-gce/pkg/tls"
"k8s.io/ingress-gce/pkg/utils"
)

// GarbageCollectionState is created by this controller for the purpose
// of garbage collecting GCLB resources during the sync of an Ingress.
type GarbageCollectionState struct {
lbNames []string
svcPorts []utils.ServicePort
}

// LoadBalancerController watches the kubernetes api and adds/removes services
// from the loadbalancer, via loadBalancerConfig.
type LoadBalancerController struct {
Expand Down Expand Up @@ -81,6 +88,9 @@ type LoadBalancerController struct {
// linker implementations for backends
negLinker backends.Linker
igLinker backends.Linker

// Ingress sync + GC implementation
ingSyncer ingsync.Syncer
}

// NewLoadBalancerController creates a controller for gce loadbalancers.
Expand Down Expand Up @@ -111,6 +121,7 @@ func NewLoadBalancerController(
negLinker: backends.NewNEGLinker(backendPool, ctx.Cloud, ctx.ClusterNamer),
igLinker: backends.NewInstanceGroupLinker(instancePool, backendPool, ctx.ClusterNamer),
}
lbc.ingSyncer = ingsync.NewIngressSyncer(&lbc)

lbc.ingQueue = utils.NewPeriodicTaskQueue("ingresses", lbc.sync)

Expand Down Expand Up @@ -257,68 +268,36 @@ func (lbc *LoadBalancerController) Stop(deleteAll bool) error {
return nil
}

// sync manages Ingress create/updates/deletes
func (lbc *LoadBalancerController) sync(key string) error {
if !lbc.hasSynced() {
time.Sleep(context.StoreSyncPollPeriod)
return fmt.Errorf("waiting for stores to sync")
// PreProcess implements Controller.
func (lbc *LoadBalancerController) PreProcess(ing *extensions.Ingress) (interface{}, error) {
urlMap, errs := lbc.Translator.TranslateIngress(ing, lbc.ctx.DefaultBackendSvcPortID)
if errs != nil {
return "", fmt.Errorf("error while evaluating the ingress spec: %v", joinErrs(errs))
}
glog.V(3).Infof("Syncing %v", key)
return urlMap, nil
}

gceIngresses, err := lbc.ingLister.ListGCEIngresses()
if err != nil {
return err
// SyncBackends implements Controller.
func (lbc *LoadBalancerController) SyncBackends(ing *extensions.Ingress, state interface{}) error {
// We expect state to be a utils.GCEURLMap
urlMap, ok := state.(*utils.GCEURLMap)
if !ok {
return fmt.Errorf("expected state type to be GCEURLMap, type was %T", state)
}
// gceSvcPorts contains the ServicePorts used by only single-cluster ingress.
gceSvcPorts := lbc.ToSvcPorts(&gceIngresses)
nodeNames, err := utils.GetReadyNodeNames(listers.NewNodeLister(lbc.nodeLister))
ingSvcPorts := urlMap.AllServicePorts()

// Create instance groups and set named ports.
igs, err := lbc.instancePool.EnsureInstanceGroupsAndPorts(lbc.ctx.ClusterNamer.InstanceGroup(), nodePorts(ingSvcPorts))
if err != nil {
return err
}

lbNames := lbc.ingLister.Store.ListKeys()
obj, ingExists, err := lbc.ingLister.Store.GetByKey(key)
nodeNames, err := utils.GetReadyNodeNames(listers.NewNodeLister(lbc.nodeLister))
if err != nil {
return err
}
if !ingExists {
glog.V(2).Infof("Ingress %q no longer exists, triggering GC", key)
// GC will find GCE resources that were used for this ingress and delete them.
return lbc.gc(lbNames, gceSvcPorts)
}

// Get ingress and DeepCopy for assurance that we don't pollute other goroutines with changes.
ing, ok := obj.(*extensions.Ingress)
if !ok {
return fmt.Errorf("invalid object (not of type Ingress), type was %T", obj)
}
ing = ing.DeepCopy()

ensureErr := lbc.ensureIngress(ing, nodeNames)
if ensureErr != nil {
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Sync", fmt.Sprintf("Error during sync: %v", ensureErr.Error()))
}

// Garbage collection will occur regardless of an error occurring. If an error occurred,
// it could have been caused by quota issues; therefore, garbage collecting now may
// free up enough quota for the next sync to pass.
if gcErr := lbc.gc(lbNames, gceSvcPorts); gcErr != nil {
glog.Errorf("error during end-of-sync GC %v", gcErr)
}

return ensureErr
}

func (lbc *LoadBalancerController) ensureIngress(ing *extensions.Ingress, nodeNames []string) error {
urlMap, errs := lbc.Translator.TranslateIngress(ing, lbc.ctx.DefaultBackendSvcPortID)
if errs != nil {
return fmt.Errorf("error while evaluating the ingress spec: %v", joinErrs(errs))
}

ingSvcPorts := urlMap.AllServicePorts()

igs, err := lbc.ensureInstanceGroupsAndPorts(ingSvcPorts, nodeNames)
if err != nil {
// Add/remove instances to the instance groups.
if err = lbc.instancePool.Sync(nodeNames); err != nil {
return err
}

Expand All @@ -334,7 +313,8 @@ func (lbc *LoadBalancerController) ensureIngress(ing *extensions.Ingress, nodeNa
if err = updateAnnotations(lbc.ctx.KubeClient, ing.Name, ing.Namespace, ing.Annotations); err != nil {
return err
}
return nil
// This short-circuit will stop the syncer from moving to next step.
return ingsync.ErrSkipBackendsSync
}

// Sync the backends
Expand Down Expand Up @@ -364,27 +344,130 @@ func (lbc *LoadBalancerController) ensureIngress(ing *extensions.Ingress, nodeNa
}
}

return nil
}

// GCBackends implements Controller.
func (lbc *LoadBalancerController) GCBackends(state interface{}) error {
// We expect state to be a GarbageCollectionState
gcState, ok := state.(*GarbageCollectionState)
if !ok {
return fmt.Errorf("expected state type to be GarbageCollectionState, type was %T", state)
}

if err := lbc.backendSyncer.GC(gcState.svcPorts); err != nil {
return err
}

// TODO(ingress#120): Move this to the backend pool so it mirrors creation
if len(gcState.lbNames) == 0 {
igName := lbc.ctx.ClusterNamer.InstanceGroup()
glog.Infof("Deleting instance group %v", igName)
if err := lbc.instancePool.DeleteInstanceGroup(igName); err != err {
return err
}
}
return nil
}

// SyncLoadBalancer implements Controller.
func (lbc *LoadBalancerController) SyncLoadBalancer(ing *extensions.Ingress, state interface{}) error {
// We expect state to be a utils.GCEURLMap
urlMap, ok := state.(*utils.GCEURLMap)
if !ok {
return fmt.Errorf("expected state type to be GCEURLMap, type was %T", state)
}

lb, err := lbc.toRuntimeInfo(ing, urlMap)
if err != nil {
return err
}

// Create higher-level LB resources.
if err := lbc.l7Pool.Sync(lb); err != nil {
return err
}
return nil
}

// GCLoadBalancers implements Controller.
func (lbc *LoadBalancerController) GCLoadBalancers(state interface{}) error {
// We expect state to be a GarbageCollectionState
gcState, ok := state.(*GarbageCollectionState)
if !ok {
return fmt.Errorf("expected state type to be GarbageCollectionState, type was %T", state)
}
if err := lbc.l7Pool.GC(gcState.lbNames); err != nil {
return err
}

return nil
}

// PostProcess implements Controller.
func (lbc *LoadBalancerController) PostProcess(ing *extensions.Ingress) error {
// Get the loadbalancer and update the ingress status.
l7, err := lbc.l7Pool.Get(lb.Name)
k, err := utils.KeyFunc(ing)
if err != nil {
return fmt.Errorf("cannot get key for Ingress %v/%v: %v", ing.Namespace, ing.Name, err)
}
l7, err := lbc.l7Pool.Get(k)
if err != nil {
return fmt.Errorf("unable to get loadbalancer: %v", err)
}
if err := lbc.updateIngressStatus(l7, ing); err != nil {
return fmt.Errorf("update ingress status error: %v", err)
}

return nil
}

// sync manages Ingress create/updates/deletes
func (lbc *LoadBalancerController) sync(key string) (retErr error) {
if !lbc.hasSynced() {
time.Sleep(context.StoreSyncPollPeriod)
return fmt.Errorf("waiting for stores to sync")
}
glog.V(3).Infof("Syncing %v", key)

// Create state needed for GC.
gceIngresses, err := lbc.ingLister.ListGCEIngresses()
if err != nil {
return err
}
// gceSvcPorts contains the ServicePorts used by only single-cluster ingress.
gceSvcPorts := lbc.ToSvcPorts(&gceIngresses)
lbNames := lbc.ingLister.Store.ListKeys()
gcState := &GarbageCollectionState{lbNames, gceSvcPorts}

obj, ingExists, err := lbc.ingLister.Store.GetByKey(key)
if !ingExists {
glog.V(2).Infof("Ingress %q no longer exists, triggering GC", key)
// GC will find GCE resources that were used for this ingress and delete them.
return lbc.ingSyncer.GC(gcState)
}

// Get ingress and DeepCopy for assurance that we don't pollute other goroutines with changes.
ing, ok := obj.(*extensions.Ingress)
if !ok {
return fmt.Errorf("invalid object (not of type Ingress), type was %T", obj)
}
ing = ing.DeepCopy()

syncErr := lbc.ingSyncer.Sync(ing)
if syncErr != nil {
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Sync", fmt.Sprintf("Error during sync: %v", syncErr.Error()))
}

// Garbage collection will occur regardless of an error occurring. If an error occurred,
// it could have been caused by quota issues; therefore, garbage collecting now may
// free up enough quota for the next sync to pass.
if gcErr := lbc.ingSyncer.GC(gcState); gcErr != nil {
retErr = fmt.Errorf("error during sync %v, error during GC %v", retErr, gcErr)
}

return syncErr
}

// updateIngressStatus updates the IP and annotations of a loadbalancer.
// The annotations are parsed by kubectl describe.
func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing *extensions.Ingress) error {
Expand Down Expand Up @@ -481,64 +564,3 @@ func (lbc *LoadBalancerController) ToSvcPorts(ings *extensions.IngressList) []ut
}
return knownPorts
}

// ensureInstanceGroupsAndPorts creates instance group if necessary
// if all service ports have NEG enabled, then instance group creation will be skipped and return nil.
func (lbc *LoadBalancerController) ensureInstanceGroupsAndPorts(svcPorts []utils.ServicePort, nodeNames []string) ([]*compute.InstanceGroup, error) {
ports := nodePorts(svcPorts)
if len(ports) == 0 {
glog.V(2).Infof("Skip ensuring instance groups as all backend(s) have NEG enabled.")
return nil, nil
}

// Create instance groups and set named ports.
igs, err := lbc.instancePool.EnsureInstanceGroupsAndPorts(lbc.ctx.ClusterNamer.InstanceGroup(), ports)
if err != nil {
return nil, err
}
// Add/remove instances to the instance groups.
if err = lbc.instancePool.Sync(nodeNames); err != nil {
return nil, err
}

return igs, nil
}

// gc garbage collects unused resources.
// - lbNames are the names of L7 loadbalancers we wish to exist. Those not in
// this list are removed from the cloud.
// - nodePorts are the ports for which we want BackendServies. BackendServices
// for ports not in this list are deleted.
// This method ignores googleapi 404 errors (StatusNotFound).
func (lbc *LoadBalancerController) gc(lbNames []string, svcPorts []utils.ServicePort) error {
// On GC:
// * Loadbalancers need to get deleted before backends.
// * Backends are refcounted in a shared pool.
// * We always want to GC backends even if there was an error in GCing
// loadbalancers, because the next Sync could rely on the GC for quota.
// * There are at least 2 cases for backend GC:
// 1. The loadbalancer has been deleted.
// 2. An update to the url map drops the refcount of a backend. This can
// happen when an Ingress is updated, if we don't GC after the update
// we'll leak the backend.
lbErr := lbc.l7Pool.GC(lbNames)
beErr := lbc.backendSyncer.GC(svcPorts)
if lbErr != nil {
return lbErr
}
if beErr != nil {
return beErr
}

// TODO(ingress#120): Move this to the backend pool so it mirrors creation
if len(lbNames) == 0 || len(nodePorts(svcPorts)) == 0 {
igName := lbc.ctx.ClusterNamer.InstanceGroup()
glog.Infof("Deleting instance group %v", igName)
if err := lbc.instancePool.DeleteInstanceGroup(igName); err != err {
return err
}
glog.V(2).Infof("Shutting down firewall as there are no loadbalancers")
}

return nil
}
2 changes: 1 addition & 1 deletion pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func TestEnsureMCIngress(t *testing.T) {
addIngress(lbc, ing)

ingStoreKey := getKey(ing, t)
if err := lbc.ensureIngress(ing, []string{"node-a", "node-b"}); err != nil {
if err := lbc.sync(ingStoreKey); err != nil {
t.Fatalf("lbc.sync(%v) = err %v", ingStoreKey, err)
}

Expand Down
38 changes: 38 additions & 0 deletions pkg/sync/interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package sync

import (
extensions "k8s.io/api/extensions/v1beta1"
)

// Syncer is an interface to sync GCP resources associated with an Ingress.
type Syncer interface {
// Sync creates a full GCLB given an Ingress.
Sync(ing *extensions.Ingress) error
// GC cleans up GCLB resources for all Ingresses and can optionally
// use some arbitrary to help with the process.
// TODO(rramkumar): Do we need to rethink the strategy of GC'ing
// all Ingresses at once?
GC(state interface{}) error
}

// Controller is an interface for ingress controllers and declares methods
// on how to sync the various portions of the GCLB for an Ingress.
type Controller interface {
// PreProcess allows for doing some pre-processing on an Ingress before
// it is synced. Some arbitrary state can also be returned.
PreProcess(ing *extensions.Ingress) (interface{}, error)
// SyncBackends syncs the backends for a GCLB given an ingress or some
// existing state.
SyncBackends(ing *extensions.Ingress, state interface{}) error
// GCBackends garbage collects backends for all Ingresses.
GCBackends(state interface{}) error
// SyncLoadBalancer syncs the front-end load balancer resources for a GCLB given
// an ingress or some existing state.
SyncLoadBalancer(ing *extensions.Ingress, state interface{}) error
// GCLoadBalancers garbage collects front-end load balancer resources
// for all Ingresses.
GCLoadBalancers(state interface{}) error
// PostProcess allows for doing some post-processing on an Ingress before
// the overall sync is complete.
PostProcess(ing *extensions.Ingress) error
}
Loading

0 comments on commit 7adac05

Please sign in to comment.