Skip to content

Commit

Permalink
Integrate ClusterServiceMapper into translator.
Browse files Browse the repository at this point in the history
  • Loading branch information
rramkumar1 committed Apr 18, 2018
1 parent 69e9e55 commit d15f36f
Show file tree
Hide file tree
Showing 12 changed files with 231 additions and 288 deletions.
7 changes: 4 additions & 3 deletions pkg/backends/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func ServicePorts(backendToService map[v1beta1.IngressBackend]v1.Service) (map[v
svcPort, err := servicePort(ib, svc)
if err != nil {
result = multierror.Append(result, err)
continue
}
backendToServicePort[ib] = svcPort
}
Expand All @@ -49,7 +50,7 @@ func ServicePorts(backendToService map[v1beta1.IngressBackend]v1.Service) (map[v
func servicePort(ib v1beta1.IngressBackend, svc v1.Service) (ServicePort, error) {
// If service is not of type NodePort, return an error.
if svc.Spec.Type != v1.ServiceTypeNodePort {
return ServicePort{}, fmt.Errorf("service %v is type %v, expected type NodePort", svc.Name, svc.Spec.Type)
return ServicePort{}, fmt.Errorf("service %v/%v for backend %+v is type %v, expected type NodePort", svc.Namespace, svc.Name, ib, svc.Spec.Type)
}
appProtocols, err := annotations.FromService(&svc).ApplicationProtocols()
if err != nil {
Expand All @@ -75,12 +76,12 @@ PortLoop:
}

if port == nil {
return ServicePort{}, fmt.Errorf("could not find matching port for backend %+v and service %s/%s. Looking for port %+v in %v", ib, svc.Namespace, ib.ServiceName, ib.ServicePort, svc.Spec.Ports)
return ServicePort{}, fmt.Errorf("could not find matching port on service %v/%v for backend %+v. Looking for port %+v in %v", svc.Namespace, ib.ServiceName, ib, ib.ServicePort, svc.Spec.Ports)
}

proto := annotations.ProtocolHTTP
if protoStr, exists := appProtocols[port.Name]; exists {
glog.V(2).Infof("service %s/%s, port %q: using protocol to %q", svc.Namespace, ib.ServiceName, port, protoStr)
glog.V(2).Infof("service %v/%v, port %q: using protocol to %q", svc.Namespace, ib.ServiceName, port, protoStr)
proto = annotations.AppProtocol(protoStr)
}

Expand Down
49 changes: 43 additions & 6 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
unversionedcore "k8s.io/client-go/kubernetes/typed/core/v1"
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"

multierror "github.com/hashicorp/go-multierror"
"k8s.io/client-go/tools/record"

"k8s.io/ingress-gce/pkg/annotations"
Expand All @@ -39,6 +41,7 @@ import (
"k8s.io/ingress-gce/pkg/controller/translator"
"k8s.io/ingress-gce/pkg/firewalls"
"k8s.io/ingress-gce/pkg/loadbalancers"
"k8s.io/ingress-gce/pkg/mapper"
"k8s.io/ingress-gce/pkg/tls"
"k8s.io/ingress-gce/pkg/utils"
)
Expand Down Expand Up @@ -165,12 +168,15 @@ func NewLoadBalancerController(ctx *context.ControllerContext, clusterManager *C
if ctx.EndpointInformer != nil {
endpointIndexer = ctx.EndpointInformer.GetIndexer()
}
svcGetter := utils.SvcGetter{Store: ctx.ServiceInformer.GetStore()}
lbc.Translator = translator.New(lbc.ctx, lbc.CloudClusterManager.ClusterNamer,
ctx.ServiceInformer.GetIndexer(),
ctx.NodeInformer.GetIndexer(),
ctx.PodInformer.GetIndexer(),
endpointIndexer,
mapper.NewClusterServiceMapper(svcGetter.Get, nil),
negEnabled)

lbc.tlsLoader = &tls.TLSCertsFromSecretsLoader{Client: lbc.ctx.KubeClient}

glog.V(3).Infof("Created new loadbalancer controller")
Expand Down Expand Up @@ -242,7 +248,14 @@ func (lbc *LoadBalancerController) sync(key string) (retErr error) {
}

// gceNodePorts contains the ServicePorts used by only single-cluster ingress.
gceNodePorts := lbc.Translator.ToNodePorts(&gceIngresses)
var gceNodePorts []backends.ServicePort
for _, gceIngress := range gceIngresses.Items {
svcPortMapping, err := lbc.Translator.ServicePortMapping(&gceIngress)
if err != nil {
glog.Infof("%v", err.Error())
}
gceNodePorts = append(gceNodePorts, extractSvcPorts(svcPortMapping)...)
}
nodeNames, err := getReadyNodeNames(listers.NewNodeLister(lbc.nodeLister))
if err != nil {
return err
Expand Down Expand Up @@ -282,7 +295,24 @@ func (lbc *LoadBalancerController) sync(key string) (retErr error) {
}

func (lbc *LoadBalancerController) ensureIngress(key string, ing *extensions.Ingress, nodeNames []string, gceNodePorts []backends.ServicePort) error {
ingNodePorts := lbc.Translator.IngressToNodePorts(ing)
// Given an ingress, returns a mapping of IngressBackend -> ServicePort
svcPortMapping, err := lbc.Translator.ServicePortMapping(ing)
if err != nil {
// TODO(rramkumar): Clean this up, it's very ugly.
switch err.(type) {
case *multierror.Error:
// Emit an event for each error in the multierror.
merr := err.(*multierror.Error)
for _, e := range merr.Errors {
msg := fmt.Sprintf("%v", e)
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Service", msg)
}
default:
msg := fmt.Sprintf("%v", err)
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Service", msg)
}
}
ingNodePorts := extractSvcPorts(svcPortMapping)
igs, err := lbc.CloudClusterManager.EnsureInstanceGroupsAndPorts(nodeNames, ingNodePorts)
if err != nil {
return err
Expand Down Expand Up @@ -345,17 +375,17 @@ func (lbc *LoadBalancerController) ensureIngress(key string, ing *extensions.Ing
return fmt.Errorf("unable to get loadbalancer: %v", err)
}

urlMap, err := lbc.Translator.ToURLMap(ing)
urlMap, err := lbc.Translator.ToURLMap(ing, svcPortMapping)
if err != nil {
return fmt.Errorf("convert to URL Map error %v", err)
return fmt.Errorf("error converting to URLMap: %v", err)
}

if err := l7.UpdateUrlMap(urlMap); err != nil {
return fmt.Errorf("update URL Map error: %v", err)
return fmt.Errorf("error updating URLMap: %v", err)
}

if err := lbc.updateIngressStatus(l7, ing); err != nil {
return fmt.Errorf("update ingress status error: %v", err)
return fmt.Errorf("error updating ingress status: %v", err)
}

return nil
Expand Down Expand Up @@ -441,3 +471,10 @@ func updateAnnotations(client kubernetes.Interface, name, namespace string, anno
}
return nil
}

func extractSvcPorts(svcPortMapping map[extensions.IngressBackend]backends.ServicePort) (svcPorts []backends.ServicePort) {
for _, svcPort := range svcPortMapping {
svcPorts = append(svcPorts, svcPort)
}
return svcPorts
}
42 changes: 27 additions & 15 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,30 +178,42 @@ func newPortManager(st, end int, namer *utils.Namer) *nodePortManager {
return &nodePortManager{map[string]int{}, st, end, namer}
}

// service is a helper function to create k8s service object.
func service(ib *extensions.IngressBackend, namespace string, pm *nodePortManager) *api_v1.Service {
svc := &api_v1.Service{
ObjectMeta: meta_v1.ObjectMeta{
Name: ib.ServiceName,
Namespace: namespace,
},
}
var svcPort api_v1.ServicePort
switch ib.ServicePort.Type {
case intstr.Int:
svcPort = api_v1.ServicePort{Port: ib.ServicePort.IntVal}
default:
svcPort = api_v1.ServicePort{Name: ib.ServicePort.StrVal}
}
svcPort.NodePort = int32(pm.getNodePort(ib.ServiceName))
svc.Spec.Ports = []api_v1.ServicePort{svcPort}
svc.Spec.Type = api_v1.ServiceTypeNodePort
return svc
}

// addIngress adds an ingress to the loadbalancer controllers ingress store. If
// a nodePortManager is supplied, it also adds all backends to the service store
// with a nodePort acquired through it.
func addIngress(lbc *LoadBalancerController, ing *extensions.Ingress, pm *nodePortManager) {
for _, rule := range ing.Spec.Rules {
for _, path := range rule.HTTP.Paths {
svc := &api_v1.Service{
ObjectMeta: meta_v1.ObjectMeta{
Name: path.Backend.ServiceName,
Namespace: ing.Namespace,
},
}
var svcPort api_v1.ServicePort
switch path.Backend.ServicePort.Type {
case intstr.Int:
svcPort = api_v1.ServicePort{Port: path.Backend.ServicePort.IntVal}
default:
svcPort = api_v1.ServicePort{Name: path.Backend.ServicePort.StrVal}
}
svcPort.NodePort = int32(pm.getNodePort(path.Backend.ServiceName))
svc.Spec.Ports = []api_v1.ServicePort{svcPort}
svc := service(&path.Backend, ing.Namespace, pm)
lbc.ctx.ServiceInformer.GetIndexer().Add(svc)
}
}
if ing.Spec.Backend != nil {
defaultBackend := ing.Spec.Backend
defaultBackendSvc := service(defaultBackend, ing.Namespace, pm)
lbc.ctx.ServiceInformer.GetIndexer().Add(defaultBackendSvc)
}
lbc.ctx.KubeClient.Extensions().Ingresses(ing.Namespace).Create(ing)
lbc.ctx.IngressInformer.GetIndexer().Add(ing)
}
Expand Down
46 changes: 0 additions & 46 deletions pkg/controller/errors/errors.go

This file was deleted.

Loading

0 comments on commit d15f36f

Please sign in to comment.