Skip to content

Commit

Permalink
Adding logic to GCE ingress controller to handle multi cluster ingresses
Browse files Browse the repository at this point in the history
  • Loading branch information
nikhiljindal committed Aug 8, 2017
1 parent cf732e8 commit ab61605
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 31 deletions.
2 changes: 1 addition & 1 deletion controllers/gce/backends/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (b *Backends) Add(p ServicePort) error {
be := &compute.BackendService{}
defer func() { b.snapshotter.Add(portKey(p.Port), be) }()

igs, namedPort, err := b.nodePool.AddInstanceGroup(b.namer.IGName(), p.Port)
igs, namedPort, err := instances.CreateInstanceGroups(b.nodePool, b.namer, p.Port)
if err != nil {
return err
}
Expand Down
23 changes: 22 additions & 1 deletion controllers/gce/controller/cluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (

"github.com/golang/glog"

compute "google.golang.org/api/compute/v1"

"k8s.io/kubernetes/pkg/cloudprovider"
gce "k8s.io/kubernetes/pkg/cloudprovider/providers/gce"

Expand Down Expand Up @@ -141,7 +143,7 @@ func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeName
if err := c.backendPool.Sync(nodePorts); err != nil {
return err
}
if err := c.instancePool.Sync(nodeNames); err != nil {
if err := c.SyncNodesInInstanceGroups(nodeNames); err != nil {
return err
}
if err := c.l7Pool.Sync(lbs); err != nil {
Expand Down Expand Up @@ -170,6 +172,25 @@ func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeName
return nil
}

func (c *ClusterManager) CreateInstanceGroups(servicePorts []backends.ServicePort) ([]*compute.InstanceGroup, error) {
var igs []*compute.InstanceGroup
var err error
for _, p := range servicePorts {
igs, _, err = instances.CreateInstanceGroups(c.instancePool, c.ClusterNamer, p.Port)
if err != nil {
return nil, err
}
}
return igs, nil
}

func (c *ClusterManager) SyncNodesInInstanceGroups(nodeNames []string) error {
if err := c.instancePool.Sync(nodeNames); err != nil {
return err
}
return nil
}

// GC garbage collects unused resources.
// - lbNames are the names of L7 loadbalancers we wish to exist. Those not in
// this list are removed from the cloud.
Expand Down
60 changes: 53 additions & 7 deletions controllers/gce/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, clusterManager *
pathHandlers := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
addIng := obj.(*extensions.Ingress)
if !isGCEIngress(addIng) {
if !isGCEIngress(addIng) && !isGCEMultiClusterIngress(addIng) {
glog.Infof("Ignoring add for ingress %v based on annotation %v", addIng.Name, ingressClassKey)
return
}
Expand All @@ -120,7 +120,7 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, clusterManager *
},
DeleteFunc: func(obj interface{}) {
delIng := obj.(*extensions.Ingress)
if !isGCEIngress(delIng) {
if !isGCEIngress(delIng) && !isGCEMultiClusterIngress(delIng) {
glog.Infof("Ignoring delete for ingress %v based on annotation %v", delIng.Name, ingressClassKey)
return
}
Expand All @@ -129,7 +129,7 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, clusterManager *
},
UpdateFunc: func(old, cur interface{}) {
curIng := cur.(*extensions.Ingress)
if !isGCEIngress(curIng) {
if !isGCEIngress(curIng) && !isGCEMultiClusterIngress(curIng) {
return
}
if !reflect.DeepEqual(old, cur) {
Expand Down Expand Up @@ -306,6 +306,13 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
glog.V(3).Infof("Finished syncing %v", key)
}()

if ingExists {
ing := obj.(*extensions.Ingress)
if isGCEMultiClusterIngress(ing) {
return lbc.syncMultiClusterIngress(ing, nodeNames)
}
}

// Record any errors during sync and throw a single error at the end. This
// allows us to free up associated cloud resources ASAP.
if err := lbc.CloudClusterManager.Checkpoint(lbs, nodeNames, nodePorts); err != nil {
Expand Down Expand Up @@ -342,6 +349,36 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
return syncError
}

func (lbc *LoadBalancerController) syncMultiClusterIngress(ing *extensions.Ingress, nodeNames []string) error {
// For multi cluster ingress, we only need to manage the instance groups and named ports on those instance groups.

// Ensure that all the required instance groups exist with the required node ports.
nodePorts := lbc.tr.ingressToNodePorts(ing)
// Add the default backend node port.
nodePorts = append(nodePorts, lbc.CloudClusterManager.defaultBackendNodePort)
igs, err := lbc.CloudClusterManager.CreateInstanceGroups(nodePorts)
if err != nil {
return err
}

// Ensure that instance groups have the right nodes.
// This is also done whenever a node is added or removed from the cluster.
// We need it here as well since instance group is not created until first ingress is observed.
if err := lbc.CloudClusterManager.SyncNodesInInstanceGroups(nodeNames); err != nil {
return err
}

// Add instance group names as annotation on the ingress.
annotations, err := addInstanceGroupsAnnotation(ing.Annotations, igs)
if err != nil {
return err
}
if err := lbc.updateAnnotations(ing.Name, ing.Namespace, annotations); err != nil {
return err
}
return nil
}

// updateIngressStatus updates the IP and annotations of a loadbalancer.
// The annotations are parsed by kubectl describe.
func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing extensions.Ingress) error {
Expand Down Expand Up @@ -372,14 +409,23 @@ func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing
lbc.recorder.Eventf(currIng, api_v1.EventTypeNormal, "CREATE", "ip: %v", ip)
}
}
annotations := loadbalancers.GetLBAnnotations(l7, currIng.Annotations, lbc.CloudClusterManager.backendPool)
if err := lbc.updateAnnotations(ing.Name, ing.Namespace, annotations); err != nil {
return err
}
return nil
}

func (lbc *LoadBalancerController) updateAnnotations(name, namespace string, annotations map[string]string) error {
// Update annotations through /update endpoint
currIng, err = ingClient.Get(ing.Name, metav1.GetOptions{})
ingClient := lbc.client.Extensions().Ingresses(namespace)
currIng, err := ingClient.Get(name, metav1.GetOptions{})
if err != nil {
return err
}
currIng.Annotations = loadbalancers.GetLBAnnotations(l7, currIng.Annotations, lbc.CloudClusterManager.backendPool)
if !reflect.DeepEqual(ing.Annotations, currIng.Annotations) {
glog.V(3).Infof("Updating annotations of %v/%v", ing.Namespace, ing.Name)
if !reflect.DeepEqual(currIng.Annotations, annotations) {
glog.V(3).Infof("Updating annotations of %v/%v", namespace, name)
currIng.Annotations = annotations
if _, err := ingClient.Update(currIng); err != nil {
return err
}
Expand Down
86 changes: 64 additions & 22 deletions controllers/gce/controller/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/golang/glog"

compute "google.golang.org/api/compute/v1"

api_v1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -75,12 +76,19 @@ const (
// ingressClassKey picks a specific "class" for the Ingress. The controller
// only processes Ingresses with this annotation either unset, or set
// to either gceIngessClass or the empty string.
ingressClassKey = "kubernetes.io/ingress.class"
gceIngressClass = "gce"
ingressClassKey = "kubernetes.io/ingress.class"
gceIngressClass = "gce"
gceMultiIngressClass = "gce-multi"

// Label key to denote which GCE zone a Kubernetes node is in.
zoneKey = "failure-domain.beta.kubernetes.io/zone"
defaultZone = ""

// instanceGroupsKey is the annotation key used by controller to specify the
// name and zone of instance groups created for the ingress.
// This is read only for users. Controller will overrite any user updates.
// This is only set for ingresses with ingressClass = "gce-multi"
instanceGroupsKey = "ingress.gcp.kubernetes.io/instance-groups"
)

// ingAnnotations represents Ingress annotations.
Expand Down Expand Up @@ -156,6 +164,13 @@ func isGCEIngress(ing *extensions.Ingress) bool {
return class == "" || class == gceIngressClass
}

// isGCEMultiClusterIngress returns true if the given Ingress has
// ingress.class annotation set to "gce-multi".
func isGCEMultiClusterIngress(ing *extensions.Ingress) bool {
class := ingAnnotations(ing.ObjectMeta.Annotations).ingressClass()
return class == gceMultiIngressClass
}

// errorNodePortNotFound is an implementation of error.
type errorNodePortNotFound struct {
backend extensions.IngressBackend
Expand Down Expand Up @@ -471,32 +486,39 @@ PortLoop:
return p, nil
}

// toNodePorts converts a pathlist to a flat list of nodeports.
// toNodePorts is a helper method over ingressToNodePorts to process a list of ingresses.
func (t *GCETranslator) toNodePorts(ings *extensions.IngressList) []backends.ServicePort {
var knownPorts []backends.ServicePort
for _, ing := range ings.Items {
defaultBackend := ing.Spec.Backend
if defaultBackend != nil {
port, err := t.getServiceNodePort(*defaultBackend, ing.Namespace)
knownPorts = append(knownPorts, t.ingressToNodePorts(&ing)...)
}
return knownPorts
}

// ingressToNodePorts converts a pathlist to a flat list of nodeports for the given ingress.
func (t *GCETranslator) ingressToNodePorts(ing *extensions.Ingress) []backends.ServicePort {
var knownPorts []backends.ServicePort
defaultBackend := ing.Spec.Backend
if defaultBackend != nil {
port, err := t.getServiceNodePort(*defaultBackend, ing.Namespace)
if err != nil {
glog.Infof("%v", err)
} else {
knownPorts = append(knownPorts, port)
}
}
for _, rule := range ing.Spec.Rules {
if rule.HTTP == nil {
glog.Errorf("ignoring non http Ingress rule")
return knownPorts
}
for _, path := range rule.HTTP.Paths {
port, err := t.getServiceNodePort(path.Backend, ing.Namespace)
if err != nil {
glog.Infof("%v", err)
} else {
knownPorts = append(knownPorts, port)
}
}
for _, rule := range ing.Spec.Rules {
if rule.HTTP == nil {
glog.Errorf("ignoring non http Ingress rule")
continue
}
for _, path := range rule.HTTP.Paths {
port, err := t.getServiceNodePort(path.Backend, ing.Namespace)
if err != nil {
glog.Infof("%v", err)
continue
}
knownPorts = append(knownPorts, port)
return knownPorts
}
knownPorts = append(knownPorts, port)
}
}
return knownPorts
Expand Down Expand Up @@ -640,3 +662,23 @@ func (o PodsByCreationTimestamp) Less(i, j int) bool {
}
return o[i].CreationTimestamp.Before(o[j].CreationTimestamp)
}

func addInstanceGroupsAnnotation(existing map[string]string, igs []*compute.InstanceGroup) (map[string]string, error) {
if existing == nil {
existing = map[string]string{}
}
type Value struct {
Name string
Zone string
}
instanceGroups := []Value{}
for _, ig := range igs {
instanceGroups = append(instanceGroups, Value{Name: ig.Name, Zone: ig.Zone})
}
jsonValue, err := json.Marshal(instanceGroups)
if err != nil {
return existing, err
}
existing[instanceGroupsKey] = string(jsonValue)
return existing, nil
}
18 changes: 18 additions & 0 deletions controllers/gce/controller/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"testing"
"time"

compute "google.golang.org/api/compute/v1"

api_v1 "k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
Expand Down Expand Up @@ -263,3 +265,19 @@ func addNodes(lbc *LoadBalancerController, zoneToNode map[string][]string) {
func getProbePath(p *api_v1.Probe) string {
return p.Handler.HTTPGet.Path
}

func TestAddInstanceGroupsAnnotation(t *testing.T) {
igs := []*compute.InstanceGroup{&compute.InstanceGroup{
Name: "ig-name",
Zone: "https://www.googleapis.com/compute/v1/projects/my-project/zones/us-central1-b",
},
}
expectedAnnotation := `[{"Name":"ig-name","Zone":"https://www.googleapis.com/compute/v1/projects/my-project/zones/us-central1-b"}]`
annotations, err := addInstanceGroupsAnnotation(nil, igs)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if annotations[instanceGroupsKey] != expectedAnnotation {
t.Fatalf("Unexpected annotation value: %s, expected: %s", annotations[instanceGroupsKey], expectedAnnotation)
}
}
13 changes: 13 additions & 0 deletions controllers/gce/instances/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package instances

import (
compute "google.golang.org/api/compute/v1"

"k8s.io/ingress/controllers/gce/utils"
)

// Helper method to create instance groups.
// This method exists to ensure that we are using the same logic at all places.
func CreateInstanceGroups(nodePool NodePool, namer *utils.Namer, port int64) ([]*compute.InstanceGroup, *compute.NamedPort, error) {
return nodePool.AddInstanceGroup(namer.IGName(), port)
}

0 comments on commit ab61605

Please sign in to comment.