diff --git a/controllers/gce/backends/backends.go b/controllers/gce/backends/backends.go index d46d68d102..8e8dd3706e 100644 --- a/controllers/gce/backends/backends.go +++ b/controllers/gce/backends/backends.go @@ -217,7 +217,7 @@ func (b *Backends) Add(p ServicePort) error { be := &compute.BackendService{} defer func() { b.snapshotter.Add(portKey(p.Port), be) }() - igs, namedPort, err := b.nodePool.AddInstanceGroup(b.namer.IGName(), p.Port) + igs, namedPort, err := instances.CreateInstanceGroups(b.nodePool, b.namer, p.Port) if err != nil { return err } diff --git a/controllers/gce/controller/cluster_manager.go b/controllers/gce/controller/cluster_manager.go index 33dc616d42..4a7677c41c 100644 --- a/controllers/gce/controller/cluster_manager.go +++ b/controllers/gce/controller/cluster_manager.go @@ -26,6 +26,8 @@ import ( "github.com/golang/glog" + compute "google.golang.org/api/compute/v1" + "k8s.io/kubernetes/pkg/cloudprovider" gce "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" @@ -141,7 +143,7 @@ func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeName if err := c.backendPool.Sync(nodePorts); err != nil { return err } - if err := c.instancePool.Sync(nodeNames); err != nil { + if err := c.SyncNodesInInstanceGroups(nodeNames); err != nil { return err } if err := c.l7Pool.Sync(lbs); err != nil { @@ -170,6 +172,25 @@ func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeName return nil } +func (c *ClusterManager) CreateInstanceGroups(servicePorts []backends.ServicePort) ([]*compute.InstanceGroup, error) { + var igs []*compute.InstanceGroup + var err error + for _, p := range servicePorts { + igs, _, err = instances.CreateInstanceGroups(c.instancePool, c.ClusterNamer, p.Port) + if err != nil { + return nil, err + } + } + return igs, nil +} + +func (c *ClusterManager) SyncNodesInInstanceGroups(nodeNames []string) error { + if err := c.instancePool.Sync(nodeNames); err != nil { + return err + } + return nil +} + // GC garbage collects unused resources. // - lbNames are the names of L7 loadbalancers we wish to exist. Those not in // this list are removed from the cloud. diff --git a/controllers/gce/controller/controller.go b/controllers/gce/controller/controller.go index ece3882310..d18d16b37b 100644 --- a/controllers/gce/controller/controller.go +++ b/controllers/gce/controller/controller.go @@ -111,7 +111,7 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, clusterManager * pathHandlers := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { addIng := obj.(*extensions.Ingress) - if !isGCEIngress(addIng) { + if !isGCEIngress(addIng) && !isGCEMultiClusterIngress(addIng) { glog.Infof("Ignoring add for ingress %v based on annotation %v", addIng.Name, ingressClassKey) return } @@ -120,7 +120,7 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, clusterManager * }, DeleteFunc: func(obj interface{}) { delIng := obj.(*extensions.Ingress) - if !isGCEIngress(delIng) { + if !isGCEIngress(delIng) && !isGCEMultiClusterIngress(delIng) { glog.Infof("Ignoring delete for ingress %v based on annotation %v", delIng.Name, ingressClassKey) return } @@ -129,7 +129,7 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, clusterManager * }, UpdateFunc: func(old, cur interface{}) { curIng := cur.(*extensions.Ingress) - if !isGCEIngress(curIng) { + if !isGCEIngress(curIng) && !isGCEMultiClusterIngress(curIng) { return } if !reflect.DeepEqual(old, cur) { @@ -306,6 +306,13 @@ func (lbc *LoadBalancerController) sync(key string) (err error) { glog.V(3).Infof("Finished syncing %v", key) }() + if ingExists { + ing := obj.(*extensions.Ingress) + if isGCEMultiClusterIngress(ing) { + return lbc.syncMultiClusterIngress(ing, nodeNames) + } + } + // Record any errors during sync and throw a single error at the end. This // allows us to free up associated cloud resources ASAP. if err := lbc.CloudClusterManager.Checkpoint(lbs, nodeNames, nodePorts); err != nil { @@ -342,6 +349,36 @@ func (lbc *LoadBalancerController) sync(key string) (err error) { return syncError } +func (lbc *LoadBalancerController) syncMultiClusterIngress(ing *extensions.Ingress, nodeNames []string) error { + // For multi cluster ingress, we only need to manage the instance groups and named ports on those instance groups. + + // Ensure that all the required instance groups exist with the required node ports. + nodePorts := lbc.tr.ingressToNodePorts(ing) + // Add the default backend node port. + nodePorts = append(nodePorts, lbc.CloudClusterManager.defaultBackendNodePort) + igs, err := lbc.CloudClusterManager.CreateInstanceGroups(nodePorts) + if err != nil { + return err + } + + // Ensure that instance groups have the right nodes. + // This is also done whenever a node is added or removed from the cluster. + // We need it here as well since instance group is not created until first ingress is observed. + if err := lbc.CloudClusterManager.SyncNodesInInstanceGroups(nodeNames); err != nil { + return err + } + + // Add instance group names as annotation on the ingress. + annotations, err := addInstanceGroupsAnnotation(ing.Annotations, igs) + if err != nil { + return err + } + if err := lbc.updateAnnotations(ing.Name, ing.Namespace, annotations); err != nil { + return err + } + return nil +} + // updateIngressStatus updates the IP and annotations of a loadbalancer. // The annotations are parsed by kubectl describe. func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing extensions.Ingress) error { @@ -372,14 +409,23 @@ func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing lbc.recorder.Eventf(currIng, api_v1.EventTypeNormal, "CREATE", "ip: %v", ip) } } + annotations := loadbalancers.GetLBAnnotations(l7, currIng.Annotations, lbc.CloudClusterManager.backendPool) + if err := lbc.updateAnnotations(ing.Name, ing.Namespace, annotations); err != nil { + return err + } + return nil +} + +func (lbc *LoadBalancerController) updateAnnotations(name, namespace string, annotations map[string]string) error { // Update annotations through /update endpoint - currIng, err = ingClient.Get(ing.Name, metav1.GetOptions{}) + ingClient := lbc.client.Extensions().Ingresses(namespace) + currIng, err := ingClient.Get(name, metav1.GetOptions{}) if err != nil { return err } - currIng.Annotations = loadbalancers.GetLBAnnotations(l7, currIng.Annotations, lbc.CloudClusterManager.backendPool) - if !reflect.DeepEqual(ing.Annotations, currIng.Annotations) { - glog.V(3).Infof("Updating annotations of %v/%v", ing.Namespace, ing.Name) + if !reflect.DeepEqual(currIng.Annotations, annotations) { + glog.V(3).Infof("Updating annotations of %v/%v", namespace, name) + currIng.Annotations = annotations if _, err := ingClient.Update(currIng); err != nil { return err } diff --git a/controllers/gce/controller/utils.go b/controllers/gce/controller/utils.go index 72ba593a8f..cc6d0b1d74 100644 --- a/controllers/gce/controller/utils.go +++ b/controllers/gce/controller/utils.go @@ -26,6 +26,7 @@ import ( "github.com/golang/glog" compute "google.golang.org/api/compute/v1" + api_v1 "k8s.io/api/core/v1" extensions "k8s.io/api/extensions/v1beta1" "k8s.io/apimachinery/pkg/api/meta" @@ -75,12 +76,19 @@ const ( // ingressClassKey picks a specific "class" for the Ingress. The controller // only processes Ingresses with this annotation either unset, or set // to either gceIngessClass or the empty string. - ingressClassKey = "kubernetes.io/ingress.class" - gceIngressClass = "gce" + ingressClassKey = "kubernetes.io/ingress.class" + gceIngressClass = "gce" + gceMultiIngressClass = "gce-multi" // Label key to denote which GCE zone a Kubernetes node is in. zoneKey = "failure-domain.beta.kubernetes.io/zone" defaultZone = "" + + // instanceGroupsKey is the annotation key used by controller to specify the + // name and zone of instance groups created for the ingress. + // This is read only for users. Controller will overrite any user updates. + // This is only set for ingresses with ingressClass = "gce-multi" + instanceGroupsKey = "ingress.gcp.kubernetes.io/instance-groups" ) // ingAnnotations represents Ingress annotations. @@ -156,6 +164,13 @@ func isGCEIngress(ing *extensions.Ingress) bool { return class == "" || class == gceIngressClass } +// isGCEMultiClusterIngress returns true if the given Ingress has +// ingress.class annotation set to "gce-multi". +func isGCEMultiClusterIngress(ing *extensions.Ingress) bool { + class := ingAnnotations(ing.ObjectMeta.Annotations).ingressClass() + return class == gceMultiIngressClass +} + // errorNodePortNotFound is an implementation of error. type errorNodePortNotFound struct { backend extensions.IngressBackend @@ -471,32 +486,39 @@ PortLoop: return p, nil } -// toNodePorts converts a pathlist to a flat list of nodeports. +// toNodePorts is a helper method over ingressToNodePorts to process a list of ingresses. func (t *GCETranslator) toNodePorts(ings *extensions.IngressList) []backends.ServicePort { var knownPorts []backends.ServicePort for _, ing := range ings.Items { - defaultBackend := ing.Spec.Backend - if defaultBackend != nil { - port, err := t.getServiceNodePort(*defaultBackend, ing.Namespace) + knownPorts = append(knownPorts, t.ingressToNodePorts(&ing)...) + } + return knownPorts +} + +// ingressToNodePorts converts a pathlist to a flat list of nodeports for the given ingress. +func (t *GCETranslator) ingressToNodePorts(ing *extensions.Ingress) []backends.ServicePort { + var knownPorts []backends.ServicePort + defaultBackend := ing.Spec.Backend + if defaultBackend != nil { + port, err := t.getServiceNodePort(*defaultBackend, ing.Namespace) + if err != nil { + glog.Infof("%v", err) + } else { + knownPorts = append(knownPorts, port) + } + } + for _, rule := range ing.Spec.Rules { + if rule.HTTP == nil { + glog.Errorf("ignoring non http Ingress rule") + return knownPorts + } + for _, path := range rule.HTTP.Paths { + port, err := t.getServiceNodePort(path.Backend, ing.Namespace) if err != nil { glog.Infof("%v", err) - } else { - knownPorts = append(knownPorts, port) - } - } - for _, rule := range ing.Spec.Rules { - if rule.HTTP == nil { - glog.Errorf("ignoring non http Ingress rule") - continue - } - for _, path := range rule.HTTP.Paths { - port, err := t.getServiceNodePort(path.Backend, ing.Namespace) - if err != nil { - glog.Infof("%v", err) - continue - } - knownPorts = append(knownPorts, port) + return knownPorts } + knownPorts = append(knownPorts, port) } } return knownPorts @@ -640,3 +662,23 @@ func (o PodsByCreationTimestamp) Less(i, j int) bool { } return o[i].CreationTimestamp.Before(o[j].CreationTimestamp) } + +func addInstanceGroupsAnnotation(existing map[string]string, igs []*compute.InstanceGroup) (map[string]string, error) { + if existing == nil { + existing = map[string]string{} + } + type Value struct { + Name string + Zone string + } + instanceGroups := []Value{} + for _, ig := range igs { + instanceGroups = append(instanceGroups, Value{Name: ig.Name, Zone: ig.Zone}) + } + jsonValue, err := json.Marshal(instanceGroups) + if err != nil { + return existing, err + } + existing[instanceGroupsKey] = string(jsonValue) + return existing, nil +} diff --git a/controllers/gce/controller/utils_test.go b/controllers/gce/controller/utils_test.go index 61438f607d..7c23ebe2da 100644 --- a/controllers/gce/controller/utils_test.go +++ b/controllers/gce/controller/utils_test.go @@ -21,6 +21,8 @@ import ( "testing" "time" + compute "google.golang.org/api/compute/v1" + api_v1 "k8s.io/api/core/v1" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" @@ -263,3 +265,19 @@ func addNodes(lbc *LoadBalancerController, zoneToNode map[string][]string) { func getProbePath(p *api_v1.Probe) string { return p.Handler.HTTPGet.Path } + +func TestAddInstanceGroupsAnnotation(t *testing.T) { + igs := []*compute.InstanceGroup{&compute.InstanceGroup{ + Name: "ig-name", + Zone: "https://www.googleapis.com/compute/v1/projects/my-project/zones/us-central1-b", + }, + } + expectedAnnotation := `[{"Name":"ig-name","Zone":"https://www.googleapis.com/compute/v1/projects/my-project/zones/us-central1-b"}]` + annotations, err := addInstanceGroupsAnnotation(nil, igs) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if annotations[instanceGroupsKey] != expectedAnnotation { + t.Fatalf("Unexpected annotation value: %s, expected: %s", annotations[instanceGroupsKey], expectedAnnotation) + } +} diff --git a/controllers/gce/instances/utils.go b/controllers/gce/instances/utils.go new file mode 100644 index 0000000000..4bd0110818 --- /dev/null +++ b/controllers/gce/instances/utils.go @@ -0,0 +1,13 @@ +package instances + +import ( + compute "google.golang.org/api/compute/v1" + + "k8s.io/ingress/controllers/gce/utils" +) + +// Helper method to create instance groups. +// This method exists to ensure that we are using the same logic at all places. +func CreateInstanceGroups(nodePool NodePool, namer *utils.Namer, port int64) ([]*compute.InstanceGroup, *compute.NamedPort, error) { + return nodePool.AddInstanceGroup(namer.IGName(), port) +}