Skip to content

Commit

Permalink
Refactor code to merge multi cluster ingress sync with single cluster…
Browse files Browse the repository at this point in the history
… ingress sync
  • Loading branch information
nikhiljindal committed Sep 7, 2017
1 parent 7d87f02 commit 0f756ae
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 66 deletions.
36 changes: 23 additions & 13 deletions controllers/gce/controller/cluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,36 +114,46 @@ func (c *ClusterManager) shutdown() error {
// TargetHttpProxy.
// - nodeNames are the names of nodes we wish to add to all loadbalancer
// instance groups.
// - nodePorts are the ports for which we require BackendServices. Each of
// these ports must also be opened on the corresponding Instance Group.
// - backendServicePorts are the ports for which we require BackendServices.
// - namedPorts are the ports which must be opened on instance groups.
// If in performing the checkpoint the cluster manager runs out of quota, a
// googleapi 403 is returned.
func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeNames []string, nodePorts []backends.ServicePort) error {
func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeNames []string, backendServicePorts []backends.ServicePort, namedPorts []backends.ServicePort) error {
if len(namedPorts) != 0 {
// Add the default backend node port to the list of named ports for instance groups.
namedPorts = append(namedPorts, c.defaultBackendNodePort)
}
// Multiple ingress paths can point to the same service (and hence nodePort)
// but each nodePort can only have one set of cloud resources behind it. So
// don't waste time double validating GCE BackendServices.
portMap := map[int64]backends.ServicePort{}
for _, p := range nodePorts {
portMap[p.Port] = p
}
nodePorts = []backends.ServicePort{}
for _, sp := range portMap {
nodePorts = append(nodePorts, sp)
namedPorts = uniq(namedPorts)
backendServicePorts = uniq(backendServicePorts)
// Create Instance Groups.
_, err := c.CreateInstanceGroups(namedPorts)
if err != nil {
return err
}
if err := c.backendPool.Sync(nodePorts); err != nil {
if err := c.backendPool.Sync(backendServicePorts); err != nil {
return err
}

if err := c.SyncNodesInInstanceGroups(nodeNames); err != nil {
return err
}
if err := c.l7Pool.Sync(lbs); err != nil {
singleClusterLbs := []*loadbalancers.L7RuntimeInfo{}
for _, lb := range lbs {
if !lb.IsMultiCluster {
singleClusterLbs = append(singleClusterLbs, lb)
}
}
if err := c.l7Pool.Sync(singleClusterLbs); err != nil {
return err
}

// TODO: Manage default backend and its firewall rule in a centralized way.
// DefaultBackend is managed in l7 pool, which doesn't understand instances,
// which the firewall rule requires.
fwNodePorts := nodePorts
fwNodePorts := backendServicePorts
if len(lbs) != 0 {
// If there are no Ingresses, we shouldn't be allowing traffic to the
// default backend. Equally importantly if the cluster gets torn down
Expand Down
92 changes: 41 additions & 51 deletions controllers/gce/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"

"k8s.io/ingress/controllers/gce/backends"
"k8s.io/ingress/controllers/gce/loadbalancers"
)

Expand Down Expand Up @@ -279,7 +280,13 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
if err != nil {
return err
}
singleClusterIngresses, err := lbc.ingLister.ListGCEIngresses()
if err != nil {
return err
}

nodePorts := lbc.tr.toNodePorts(&ingresses)
singleClusterNodePorts := lbc.tr.toNodePorts(&singleClusterIngresses)
lbNames := lbc.ingLister.Store.ListKeys()
lbs, err := lbc.ListRuntimeInfo()
if err != nil {
Expand Down Expand Up @@ -313,16 +320,9 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
glog.V(3).Infof("Finished syncing %v", key)
}()

if ingExists {
ing := obj.(*extensions.Ingress)
if isGCEMultiClusterIngress(ing) {
return lbc.syncMultiClusterIngress(ing, nodeNames)
}
}

// Record any errors during sync and throw a single error at the end. This
// allows us to free up associated cloud resources ASAP.
if err := lbc.CloudClusterManager.Checkpoint(lbs, nodeNames, nodePorts); err != nil {
if err := lbc.CloudClusterManager.Checkpoint(lbs, nodeNames, singleClusterNodePorts, nodePorts); err != nil {
// TODO: Implement proper backoff for the queue.
eventMsg := "GCE"
if ingExists {
Expand All @@ -336,59 +336,48 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
if !ingExists {
return syncError
}
ing := obj.(*extensions.Ingress)
if isGCEMultiClusterIngress(ing) {
// Add instance group names as annotation on the ingress.
if ing.Annotations == nil {
ing.Annotations = map[string]string{}
}
// Since we just created instance groups in Checkpoint, calling create
// instance groups again should just return names of the existing
// instance groups. It does not matter which nodePort we pass as argument.
igs, err := lbc.CloudClusterManager.CreateInstanceGroups([]backends.ServicePort{nodePorts[0]})
if err != nil {
return fmt.Errorf("error in creating instance groups: %v", err)
}
err = setInstanceGroupsAnnotation(ing.Annotations, igs)
if err != nil {
return err
}
if err := lbc.updateAnnotations(ing.Name, ing.Namespace, ing.Annotations); err != nil {
return err
}
return nil
}

// Update the UrlMap of the single loadbalancer that came through the watch.
l7, err := lbc.CloudClusterManager.l7Pool.Get(key)
if err != nil {
syncError = fmt.Errorf("%v, unable to get loadbalancer: %v", syncError, err)
return syncError
}

ing := *obj.(*extensions.Ingress)
if urlMap, err := lbc.tr.toURLMap(&ing); err != nil {
if urlMap, err := lbc.tr.toURLMap(ing); err != nil {
syncError = fmt.Errorf("%v, convert to url map error %v", syncError, err)
} else if err := l7.UpdateUrlMap(urlMap); err != nil {
lbc.recorder.Eventf(&ing, apiv1.EventTypeWarning, "UrlMap", err.Error())
lbc.recorder.Eventf(ing, apiv1.EventTypeWarning, "UrlMap", err.Error())
syncError = fmt.Errorf("%v, update url map error: %v", syncError, err)
} else if err := lbc.updateIngressStatus(l7, ing); err != nil {
lbc.recorder.Eventf(&ing, apiv1.EventTypeWarning, "Status", err.Error())
} else if err := lbc.updateIngressStatus(l7, *ing); err != nil {
lbc.recorder.Eventf(ing, apiv1.EventTypeWarning, "Status", err.Error())
syncError = fmt.Errorf("%v, update ingress error: %v", syncError, err)
}
return syncError
}

func (lbc *LoadBalancerController) syncMultiClusterIngress(ing *extensions.Ingress, nodeNames []string) error {
// For multi cluster ingress, we only need to manage the instance groups and named ports on those instance groups.

// Ensure that all the required instance groups exist with the required node ports.
nodePorts := lbc.tr.ingressToNodePorts(ing)
// Add the default backend node port.
nodePorts = append(nodePorts, lbc.CloudClusterManager.defaultBackendNodePort)
igs, err := lbc.CloudClusterManager.CreateInstanceGroups(nodePorts)
if err != nil {
return err
}

// Ensure that instance groups have the right nodes.
// This is also done whenever a node is added or removed from the cluster.
// We need it here as well since instance group is not created until first ingress is observed.
if err := lbc.CloudClusterManager.SyncNodesInInstanceGroups(nodeNames); err != nil {
return err
}

// Add instance group names as annotation on the ingress.
if ing.Annotations == nil {
ing.Annotations = map[string]string{}
}
err = setInstanceGroupsAnnotation(ing.Annotations, igs)
if err != nil {
return err
}
if err := lbc.updateAnnotations(ing.Name, ing.Namespace, ing.Annotations); err != nil {
return err
}
return nil
}

// updateIngressStatus updates the IP and annotations of a loadbalancer.
// The annotations are parsed by kubectl describe.
func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing extensions.Ingress) error {
Expand Down Expand Up @@ -469,11 +458,12 @@ func (lbc *LoadBalancerController) ListRuntimeInfo() (lbs []*loadbalancers.L7Run
}

lbs = append(lbs, &loadbalancers.L7RuntimeInfo{
Name: k,
TLS: tls,
TLSName: annotations.useNamedTLS(),
AllowHTTP: annotations.allowHTTP(),
StaticIPName: annotations.staticIPName(),
Name: k,
TLS: tls,
TLSName: annotations.useNamedTLS(),
AllowHTTP: annotations.allowHTTP(),
StaticIPName: annotations.staticIPName(),
IsMultiCluster: isGCEMultiClusterIngress(&ing),
})
}
return lbs, nil
Expand Down
24 changes: 24 additions & 0 deletions controllers/gce/controller/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,17 @@ func ListAll(store cache.Store, selector labels.Selector, appendFn cache.AppendF

// List lists all Ingress' in the store.
func (s *StoreToIngressLister) List() (ing extensions.IngressList, err error) {
for _, m := range s.Store.List() {
newIng := m.(*extensions.Ingress)
if isGCEIngress(newIng) || isGCEMultiClusterIngress(newIng) {
ing.Items = append(ing.Items, *newIng)
}
}
return ing, nil
}

// ListGCEIngresses lists all GCE Ingress' in the store.
func (s *StoreToIngressLister) ListGCEIngresses() (ing extensions.IngressList, err error) {
for _, m := range s.Store.List() {
newIng := m.(*extensions.Ingress)
if isGCEIngress(newIng) {
Expand Down Expand Up @@ -680,3 +691,16 @@ func setInstanceGroupsAnnotation(existing map[string]string, igs []*compute.Inst
existing[instanceGroupsAnnotationKey] = string(jsonValue)
return nil
}

// uniq returns an array of unique service ports from the given array.
func uniq(nodePorts []backends.ServicePort) []backends.ServicePort {
portMap := map[int64]backends.ServicePort{}
for _, p := range nodePorts {
portMap[p.Port] = p
}
nodePorts = []backends.ServicePort{}
for _, sp := range portMap {
nodePorts = append(nodePorts, sp)
}
return nodePorts
}
4 changes: 2 additions & 2 deletions controllers/gce/controller/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,8 @@ func TestAddInstanceGroupsAnnotation(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if annotations[instanceGroupsKey] != c.ExpectedAnnotation {
t.Fatalf("Unexpected annotation value: %s, expected: %s", annotations[instanceGroupsKey], c.ExpectedAnnotation)
if annotations[instanceGroupsAnnotationKey] != c.ExpectedAnnotation {
t.Fatalf("Unexpected annotation value: %s, expected: %s", annotations[instanceGroupsAnnotationKey], c.ExpectedAnnotation)
}
}
}
2 changes: 2 additions & 0 deletions controllers/gce/loadbalancers/loadbalancers.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ type L7RuntimeInfo struct {
// The name of a Global Static IP. If specified, the IP associated with
// this name is used in the Forwarding Rules for this loadbalancer.
StaticIPName string
// IsMultiCluster is true if the loadbalancer is spread across multiple clusters.
IsMultiCluster bool
}

// String returns the load balancer name
Expand Down

0 comments on commit 0f756ae

Please sign in to comment.