Skip to content

Commit

Permalink
Create Neg CRs when neg crd is enabled
Browse files Browse the repository at this point in the history
  - when EnableNegCrd flag is set, neg client is created
  - manager will create neg crs when creating syncers
  - manager will delete neg crs when deleting syncers
  • Loading branch information
swetharepakula committed Jul 7, 2020
1 parent 0a3935e commit 174a274
Show file tree
Hide file tree
Showing 7 changed files with 1,018 additions and 29 deletions.
12 changes: 10 additions & 2 deletions cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"k8s.io/client-go/tools/record"
backendconfigclient "k8s.io/ingress-gce/pkg/backendconfig/client/clientset/versioned"
frontendconfigclient "k8s.io/ingress-gce/pkg/frontendconfig/client/clientset/versioned"
svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned"

ingctx "k8s.io/ingress-gce/pkg/context"
"k8s.io/ingress-gce/pkg/controller"
Expand Down Expand Up @@ -126,11 +127,18 @@ func main() {
}
}

var svcNegClient svcnegclient.Interface
if flags.F.EnableNegCrd {
negCRDMeta := svcneg.CRDMeta()
if _, err := crdHandler.EnsureCRD(negCRDMeta); err != nil {
klog.Fatalf("Failed to ensure ServiceNetworkEndpointGroup CRD: %v", err)
}

svcNegClient, err = svcnegclient.NewForConfig(kubeConfig)
if err != nil {
klog.Fatalf("Failed to create NetworkEndpointGroup client: %v", err)
}

}

namer, err := app.NewNamer(kubeClient, flags.F.ClusterName, firewalls.DefaultFirewallName)
Expand Down Expand Up @@ -160,7 +168,7 @@ func main() {
ASMConfigMapNamespace: flags.F.ASMConfigMapBasedConfigNamespace,
ASMConfigMapName: flags.F.ASMConfigMapBasedConfigCMName,
}
ctx := ingctx.NewControllerContext(kubeConfig, kubeClient, backendConfigClient, frontendConfigClient, nil, cloud, namer, kubeSystemUID, ctxConfig)
ctx := ingctx.NewControllerContext(kubeConfig, kubeClient, backendConfigClient, frontendConfigClient, svcNegClient, cloud, namer, kubeSystemUID, ctxConfig)
go app.RunHTTPServer(ctx.HealthCheck)

if !flags.F.LeaderElection.LeaderElect {
Expand Down Expand Up @@ -253,7 +261,7 @@ func runControllers(ctx *ingctx.ControllerContext) {
}

// TODO: Refactor NEG to use cloud mocks so ctx.Cloud can be referenced within NewController.
negController := neg.NewController(negtypes.NewAdapter(ctx.Cloud), ctx, zoneGetter, ctx.ClusterNamer, flags.F.ResyncPeriod, flags.F.NegGCPeriod, flags.F.EnableReadinessReflector, flags.F.RunIngressController, flags.F.RunL4Controller)
negController := neg.NewController(negtypes.NewAdapter(ctx.Cloud), ctx, zoneGetter, ctx.ClusterNamer, flags.F.ResyncPeriod, flags.F.NegGCPeriod, flags.F.EnableReadinessReflector, flags.F.RunIngressController, flags.F.RunL4Controller, flags.F.EnableNegCrd)

go negController.Run(stopCh)
klog.V(0).Infof("negController started")
Expand Down
30 changes: 23 additions & 7 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
apimachinerytypes "k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
Expand Down Expand Up @@ -99,6 +100,9 @@ type Controller struct {

// runL4 indicates whether to run NEG controller that processes L4 ILB services
runL4 bool

// indicates whether neg crd have been enabled
enableNegCrd bool
}

// NewController returns a network endpoint group controller.
Expand All @@ -112,6 +116,7 @@ func NewController(
enableReadinessReflector bool,
runIngress bool,
runL4Controller bool,
enableNegCrd bool,
) *Controller {
// init event recorder
// TODO: move event recorder initializer to main. Reuse it among controllers.
Expand All @@ -123,7 +128,7 @@ func NewController(
recorder := eventBroadcaster.NewRecorder(scheme.Scheme,
apiv1.EventSource{Component: "neg-controller"})

manager := newSyncerManager(namer, recorder, cloud, zoneGetter, ctx.PodInformer.GetIndexer(), ctx.ServiceInformer.GetIndexer(), ctx.EndpointInformer.GetIndexer(), ctx.NodeInformer.GetIndexer())
manager := newSyncerManager(namer, recorder, cloud, zoneGetter, ctx.NegClient, ctx.PodInformer.GetIndexer(), ctx.ServiceInformer.GetIndexer(), ctx.EndpointInformer.GetIndexer(), ctx.NodeInformer.GetIndexer())
var reflector readiness.Reflector
if enableReadinessReflector {
reflector = readiness.NewReadinessReflector(ctx, manager)
Expand Down Expand Up @@ -151,6 +156,7 @@ func NewController(
reflector: reflector,
collector: ctx.ControllerMetrics,
runL4: runL4Controller,
enableNegCrd: enableNegCrd,
}

if runIngress {
Expand Down Expand Up @@ -376,8 +382,8 @@ func (c *Controller) processService(key string) error {
}
if !exists {
c.collector.DeleteNegService(key)
c.manager.StopSyncer(namespace, name)
return nil

return c.manager.StopSyncer(namespace, name)
}

service := obj.(*apiv1.Service)
Expand Down Expand Up @@ -431,9 +437,15 @@ func (c *Controller) processService(key string) error {
klog.V(4).Infof("Service %q does not need any NEG. Skipping", key)
c.collector.DeleteNegService(key)
// neg annotation is not found or NEG is not enabled
c.manager.StopSyncer(namespace, name)
var errList []error
if err = c.manager.StopSyncer(namespace, name); err != nil {
errList = append(errList, err)
}
// delete the annotation
return c.syncNegStatusAnnotation(namespace, name, make(negtypes.PortInfoMap))
if err = c.syncNegStatusAnnotation(namespace, name, make(negtypes.PortInfoMap)); err != nil {
errList = append(errList, err)
}
return utilerrors.NewAggregate(errList)
}

// mergeIngressPortInfo merges Ingress PortInfo into portInfoMap if the service has Enable Ingress annotation.
Expand Down Expand Up @@ -481,12 +493,16 @@ func (c *Controller) mergeStandaloneNEGsPortInfo(service *apiv1.Service, name ty
)
}

exposedNegSvcPort, _, err := negServicePorts(negAnnotation, knowSvcPortSet)
exposedNegSvcPort, customNames, err := negServicePorts(negAnnotation, knowSvcPortSet)
if err != nil {
return err
}

if err := portInfoMap.Merge(negtypes.NewPortInfoMap(name.Namespace, name.Name, exposedNegSvcPort, c.namer /*readinessGate*/, true, nil)); err != nil {
if !c.enableNegCrd {
customNames = nil
}

if err := portInfoMap.Merge(negtypes.NewPortInfoMap(name.Namespace, name.Name, exposedNegSvcPort, c.namer /*readinessGate*/, true, customNames)); err != nil {
return fmt.Errorf("failed to merge service ports exposed as standalone NEGs (%v) into ingress referenced service ports (%v): %v", exposedNegSvcPort, portInfoMap, err)
}
}
Expand Down
Loading

0 comments on commit 174a274

Please sign in to comment.