diff --git a/cmd/glbc/main.go b/cmd/glbc/main.go index 8172b1bacd..f79981fdec 100644 --- a/cmd/glbc/main.go +++ b/cmd/glbc/main.go @@ -38,6 +38,7 @@ import ( "k8s.io/client-go/tools/record" backendconfigclient "k8s.io/ingress-gce/pkg/backendconfig/client/clientset/versioned" frontendconfigclient "k8s.io/ingress-gce/pkg/frontendconfig/client/clientset/versioned" + svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned" ingctx "k8s.io/ingress-gce/pkg/context" "k8s.io/ingress-gce/pkg/controller" @@ -126,11 +127,18 @@ func main() { } } + var svcNegClient svcnegclient.Interface if flags.F.EnableNegCrd { negCRDMeta := svcneg.CRDMeta() if _, err := crdHandler.EnsureCRD(negCRDMeta); err != nil { klog.Fatalf("Failed to ensure ServiceNetworkEndpointGroup CRD: %v", err) } + + svcNegClient, err = svcnegclient.NewForConfig(kubeConfig) + if err != nil { + klog.Fatalf("Failed to create NetworkEndpointGroup client: %v", err) + } + } namer, err := app.NewNamer(kubeClient, flags.F.ClusterName, firewalls.DefaultFirewallName) @@ -160,7 +168,7 @@ func main() { ASMConfigMapNamespace: flags.F.ASMConfigMapBasedConfigNamespace, ASMConfigMapName: flags.F.ASMConfigMapBasedConfigCMName, } - ctx := ingctx.NewControllerContext(kubeConfig, kubeClient, backendConfigClient, frontendConfigClient, nil, cloud, namer, kubeSystemUID, ctxConfig) + ctx := ingctx.NewControllerContext(kubeConfig, kubeClient, backendConfigClient, frontendConfigClient, svcNegClient, cloud, namer, kubeSystemUID, ctxConfig) go app.RunHTTPServer(ctx.HealthCheck) if !flags.F.LeaderElection.LeaderElect { @@ -253,7 +261,7 @@ func runControllers(ctx *ingctx.ControllerContext) { } // TODO: Refactor NEG to use cloud mocks so ctx.Cloud can be referenced within NewController. - negController := neg.NewController(negtypes.NewAdapter(ctx.Cloud), ctx, zoneGetter, ctx.ClusterNamer, flags.F.ResyncPeriod, flags.F.NegGCPeriod, flags.F.EnableReadinessReflector, flags.F.RunIngressController, flags.F.RunL4Controller) + negController := neg.NewController(negtypes.NewAdapter(ctx.Cloud), ctx, zoneGetter, ctx.ClusterNamer, flags.F.ResyncPeriod, flags.F.NegGCPeriod, flags.F.EnableReadinessReflector, flags.F.RunIngressController, flags.F.RunL4Controller, flags.F.EnableNegCrd) go negController.Run(stopCh) klog.V(0).Infof("negController started") diff --git a/pkg/neg/controller.go b/pkg/neg/controller.go index d0ba5ca629..2fb5fff06c 100644 --- a/pkg/neg/controller.go +++ b/pkg/neg/controller.go @@ -99,6 +99,9 @@ type Controller struct { // runL4 indicates whether to run NEG controller that processes L4 ILB services runL4 bool + + // indicates whether neg crd have been enabled + enableNegCrd bool } // NewController returns a network endpoint group controller. @@ -112,6 +115,7 @@ func NewController( enableReadinessReflector bool, runIngress bool, runL4Controller bool, + enableNegCrd bool, ) *Controller { // init event recorder // TODO: move event recorder initializer to main. Reuse it among controllers. @@ -123,7 +127,11 @@ func NewController( recorder := eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{Component: "neg-controller"}) - manager := newSyncerManager(namer, recorder, cloud, zoneGetter, ctx.PodInformer.GetIndexer(), ctx.ServiceInformer.GetIndexer(), ctx.EndpointInformer.GetIndexer(), ctx.NodeInformer.GetIndexer()) + var svcNegInformer cache.Indexer + if enableNegCrd { + svcNegInformer = ctx.SvcNegInformer.GetIndexer() + } + manager := newSyncerManager(namer, recorder, cloud, zoneGetter, ctx.SvcNegClient, ctx.PodInformer.GetIndexer(), ctx.ServiceInformer.GetIndexer(), ctx.EndpointInformer.GetIndexer(), ctx.NodeInformer.GetIndexer(), svcNegInformer) var reflector readiness.Reflector if enableReadinessReflector { reflector = readiness.NewReadinessReflector(ctx, manager) @@ -151,6 +159,7 @@ func NewController( reflector: reflector, collector: ctx.ControllerMetrics, runL4: runL4Controller, + enableNegCrd: enableNegCrd, } if runIngress { @@ -432,6 +441,7 @@ func (c *Controller) processService(key string) error { c.collector.DeleteNegService(key) // neg annotation is not found or NEG is not enabled c.manager.StopSyncer(namespace, name) + // delete the annotation return c.syncNegStatusAnnotation(namespace, name, make(negtypes.PortInfoMap)) } @@ -481,12 +491,19 @@ func (c *Controller) mergeStandaloneNEGsPortInfo(service *apiv1.Service, name ty ) } - exposedNegSvcPort, _, err := negServicePorts(negAnnotation, knowSvcPortSet) + exposedNegSvcPort, customNames, err := negServicePorts(negAnnotation, knowSvcPortSet) if err != nil { return err } - if err := portInfoMap.Merge(negtypes.NewPortInfoMap(name.Namespace, name.Name, exposedNegSvcPort, c.namer /*readinessGate*/, true, nil)); err != nil { + if !c.enableNegCrd && len(customNames) != 0 { + return fmt.Errorf("custom neg name specified in service (%s) but neg crd is not enabled", name.String()) + } + if negAnnotation.NEGEnabledForIngress() && len(customNames) != 0 { + return fmt.Errorf("configuration for negs in service (%s) is invalid, custom neg name cannot be used with ingress enabled", name.String()) + } + + if err := portInfoMap.Merge(negtypes.NewPortInfoMap(name.Namespace, name.Name, exposedNegSvcPort, c.namer /*readinessGate*/, true, customNames)); err != nil { return fmt.Errorf("failed to merge service ports exposed as standalone NEGs (%v) into ingress referenced service ports (%v): %v", exposedNegSvcPort, portInfoMap, err) } } diff --git a/pkg/neg/controller_test.go b/pkg/neg/controller_test.go index 882b396ccd..6f739721e7 100644 --- a/pkg/neg/controller_test.go +++ b/pkg/neg/controller_test.go @@ -33,6 +33,7 @@ import ( istioV1alpha3 "istio.io/api/networking/v1alpha3" apiv1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/api/networking/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -49,6 +50,8 @@ import ( "k8s.io/ingress-gce/pkg/context" "k8s.io/ingress-gce/pkg/flags" negtypes "k8s.io/ingress-gce/pkg/neg/types" + svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned" + negfake "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned/fake" "k8s.io/ingress-gce/pkg/utils" namer_util "k8s.io/ingress-gce/pkg/utils/namer" "k8s.io/legacy-cloud-providers/gce" @@ -75,6 +78,10 @@ var ( ) func newTestController(kubeClient kubernetes.Interface) *Controller { + return newTestControllerWithNegClient(kubeClient, nil) +} + +func newTestControllerWithNegClient(kubeClient kubernetes.Interface, svcNegClient svcnegclient.Interface) *Controller { backendConfigClient := backendconfigclient.NewSimpleClientset() namer := namer_util.NewNamer(ClusterID, "") dynamicSchema := runtime.NewScheme() @@ -90,7 +97,7 @@ func newTestController(kubeClient kubernetes.Interface) *Controller { ASMConfigMapNamespace: "kube-system", ASMConfigMapName: "ingress-controller-config-test", } - context := context.NewControllerContext(nil, kubeClient, backendConfigClient, nil, nil, gce.NewFakeGCECloud(gce.DefaultTestClusterValues()), namer, "" /*kubeSystemUID*/, ctxConfig) + context := context.NewControllerContext(nil, kubeClient, backendConfigClient, nil, svcNegClient, gce.NewFakeGCECloud(gce.DefaultTestClusterValues()), namer, "" /*kubeSystemUID*/, ctxConfig) // Hack the context.Init func. configMapInformer := informerv1.NewConfigMapInformer(kubeClient, context.Namespace, context.ResyncPeriod, utils.NewNamespaceIndexer()) @@ -105,6 +112,11 @@ func newTestController(kubeClient kubernetes.Interface) *Controller { context.DestinationRuleInformer = drDynamicInformer.Informer() context.DestinationRuleClient = dynamicClient.Resource(destrinationGVR) + enableNegCrd := false + if svcNegClient != nil { + enableNegCrd = true + } + controller := NewController( negtypes.NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network"), context, @@ -116,6 +128,7 @@ func newTestController(kubeClient kubernetes.Interface) *Controller { false, true, false, + enableNegCrd, ) return controller } @@ -870,6 +883,147 @@ func TestMergeCSMPortInfoMap(t *testing.T) { } } +func TestEnableNegCRD(t *testing.T) { + t.Parallel() + + testCases := []struct { + desc string + exposedPortNames map[int32]string + expectNegPorts []int32 + svcNegClient svcnegclient.Interface + ingress bool + expectErr bool + }{ + { + desc: "No ingress, multiple ports all custom names", + exposedPortNames: map[int32]string{80: "neg-1", 443: "neg-2", 8081: "neg-3", 8080: "neg-4"}, + expectNegPorts: []int32{80, 443, 8081, 8080}, + svcNegClient: negfake.NewSimpleClientset(), + }, + { + desc: "No ingress, multiple ports, mix of custom names", + exposedPortNames: map[int32]string{80: "neg-1", 443: "", 8081: "neg-3"}, + expectNegPorts: []int32{80, 443, 8081}, + svcNegClient: negfake.NewSimpleClientset(), + }, + { + desc: "No ingress, one port, custom name", + exposedPortNames: map[int32]string{80: "neg-1"}, + expectNegPorts: []int32{80}, + svcNegClient: negfake.NewSimpleClientset(), + }, + { + desc: "No ingress, one port, custom name, neg crd is not enabled", + exposedPortNames: map[int32]string{80: "neg-1"}, + expectNegPorts: []int32{80}, + svcNegClient: nil, + expectErr: true, + }, + { + desc: "ingress, one port, custom name, neg crd is enabled", + exposedPortNames: map[int32]string{80: "neg-1"}, + expectNegPorts: []int32{80}, + svcNegClient: negfake.NewSimpleClientset(), + ingress: true, + expectErr: true, + }, + { + desc: "ingress, one port, neg crd is enabled", + exposedPortNames: map[int32]string{80: ""}, + expectNegPorts: []int32{80}, + svcNegClient: negfake.NewSimpleClientset(), + ingress: true, + expectErr: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + controller := newTestControllerWithNegClient(fake.NewSimpleClientset(), tc.svcNegClient) + manager := controller.manager.(*syncerManager) + defer controller.stop() + svcKey := utils.ServiceKeyFunc(testServiceNamespace, testServiceName) + service := newTestServiceCustomNamedNeg(controller, tc.exposedPortNames, tc.ingress) + controller.serviceLister.Add(service) + + err := controller.processService(svcKey) + if !tc.expectErr && err != nil { + t.Fatalf("Failed to process service: %v", err) + } else if tc.expectErr && err == nil { + t.Fatalf("Expected an error when processing service") + } else if tc.expectErr && err != nil { + // ensure no leaked goroutines + controller.serviceLister.Delete(service) + err = controller.processService(svcKey) + if err != nil { + t.Fatalf("Failed to process service: %v", err) + } + return + } + + expectedSyncers := len(tc.exposedPortNames) + validateSyncers(t, controller, expectedSyncers, false) + svcClient := controller.client.CoreV1().Services(testServiceNamespace) + svc, err := svcClient.Get(context2.TODO(), testServiceName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Service was not created successfully, err: %v", err) + } + if tc.svcNegClient != nil && !tc.ingress { + validateServiceStateAnnotationWithPortNameMap(t, svc, tc.expectNegPorts, controller.namer, tc.exposedPortNames) + validateNegCRs(t, svc, tc.svcNegClient, controller.namer, tc.exposedPortNames) + + } else { + validateServiceStateAnnotation(t, svc, tc.expectNegPorts, controller.namer) + } + + if tc.svcNegClient != nil { + // Populate manager's ServiceNetworkEndpointGroup Cache + negs, err := tc.svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(svc.Namespace).List(context2.TODO(), metav1.ListOptions{}) + if err != nil { + t.Errorf("failed to retrieve negs") + } + + for _, neg := range negs.Items { + n := neg + manager.svcNegLister.Add(&n) + } + } + + controller.serviceLister.Delete(service) + err = controller.processService(svcKey) + if err != nil { + t.Fatalf("Failed to process service: %v", err) + } + + validateSyncers(t, controller, expectedSyncers, true) + }) + } +} + +func validateNegCRs(t *testing.T, svc *v1.Service, svcNegClient svcnegclient.Interface, namer negtypes.NetworkEndpointGroupNamer, negPortNameMap map[int32]string) { + t.Helper() + + for port, expectedName := range negPortNameMap { + name := expectedName + if name == "" { + name = namer.NEG(svc.Namespace, svc.Name, port) + } + neg, err := svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(svc.Namespace).Get(context2.TODO(), name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("neg cr was not created successfully: err: %s", err) + } + + ownerReferences := neg.GetOwnerReferences() + if len(ownerReferences) != 1 { + t.Fatalf("neg cr should only have 1 owner reference, has %d", len(ownerReferences)) + } + + if ownerReferences[0].UID != svc.UID { + t.Fatalf("neg cr owner reference does not point to service %s/%s", svc.Namespace, svc.Name) + } + } +} + func validateSyncers(t *testing.T, controller *Controller, num int, stopped bool) { t.Helper() if len(controller.manager.(*syncerManager).syncerMap) != num { @@ -948,14 +1102,51 @@ func validateDestinationRuleAnnotationWithPortInfoMap(t *testing.T, usdr *unstru } } +// validateServiceStateAnnotationWithPortNameMap validates all aspects of the service annotation +// and also checks for custon names if specified in given portNameMap +func validateServiceStateAnnotationWithPortNameMap(t *testing.T, svc *apiv1.Service, svcPorts []int32, namer negtypes.NetworkEndpointGroupNamer, portNameMap map[int32]string) { + + negStatus := validateServiceStateAnnotationExceptNames(t, svc, svcPorts) + for svcPort, name := range portNameMap { + negName, ok := negStatus.NetworkEndpointGroups[strconv.Itoa(int(svcPort))] + if !ok { + t.Fatalf("NEG for port %d was not found", svcPort) + } + var expectName = name + if name == "" { + expectName = namer.NEG(svc.Namespace, svc.Name, svcPort) + } + if negName != expectName { + t.Fatalf("Expect NEG name of service port %d to be %q, but got %q", svcPort, expectName, negName) + } + } +} + func validateServiceStateAnnotation(t *testing.T, svc *apiv1.Service, svcPorts []int32, namer negtypes.NetworkEndpointGroupNamer) { t.Helper() + + negStatus := validateServiceStateAnnotationExceptNames(t, svc, svcPorts) + for _, svcPort := range svcPorts { + negName, ok := negStatus.NetworkEndpointGroups[strconv.Itoa(int(svcPort))] + if !ok { + t.Fatalf("NEG for port %d was not found", svcPort) + } + expectName := namer.NEG(svc.Namespace, svc.Name, svcPort) + if negName != expectName { + t.Fatalf("Expect NEG name of service port %d to be %q, but got %q", svcPort, expectName, negName) + } + } +} + +// validateServiceStateAnnotationExceptNames will validate all aspects of the status annotation except for the name +func validateServiceStateAnnotationExceptNames(t *testing.T, svc *apiv1.Service, svcPorts []int32) annotations.NegStatus { + t.Helper() if len(svcPorts) == 0 { v, ok := svc.Annotations[annotations.NEGStatusKey] if ok { t.Fatalf("Expected no NEG service state annotation when there are no servicePorts, got: %v", v) } - return + return annotations.NegStatus{} } v, ok := svc.Annotations[annotations.NEGStatusKey] @@ -988,23 +1179,13 @@ func validateServiceStateAnnotation(t *testing.T, svc *apiv1.Service, svcPorts [ t.Fatalf("Expect # of NEG to be %d, but got %d", len(svcPorts), len(negStatus.NetworkEndpointGroups)) } - for _, svcPort := range svcPorts { - negName, ok := negStatus.NetworkEndpointGroups[strconv.Itoa(int(svcPort))] - if !ok { - t.Fatalf("NEG for port %d was not found", svcPort) - } - expectName := namer.NEG(svc.Namespace, svc.Name, svcPort) - if negName != expectName { - t.Fatalf("Expect NEG name of service port %d to be %q, but got %q", svcPort, expectName, negName) - } - } - zoneInStatus := sets.NewString(negStatus.Zones...) expectedZones := sets.NewString(zones...) if !zoneInStatus.Equal(expectedZones) { t.Fatalf("Expect Zone %v, but got %v", expectedZones.List(), zoneInStatus.List()) } + return negStatus } func generateNegAnnotation(ingress bool, svcPorts []int32) string { @@ -1020,6 +1201,23 @@ func generateNegAnnotation(ingress bool, svcPorts []int32) string { return string(formattedAnnotation) } +func generateCustomNamedNegAnnotation(ingress bool, svcPorts map[int32]string) string { + var annotation annotations.NegAnnotation + enabledPorts := make(map[int32]annotations.NegAttributes) + for port, name := range svcPorts { + if name != "" { + enabledPorts[port] = annotations.NegAttributes{Name: name} + } else { + enabledPorts[port] = annotations.NegAttributes{} + } + } + + annotation.Ingress = ingress + annotation.ExposedPorts = enabledPorts + formattedAnnotation, _ := json.Marshal(annotation) + return string(formattedAnnotation) +} + func newTestIngress(name string) *v1beta1.Ingress { return &v1beta1.Ingress{ ObjectMeta: metav1.ObjectMeta{ @@ -1170,6 +1368,50 @@ func newTestService(c *Controller, negIngress bool, negSvcPorts []int32) *apiv1. return svc } +func newTestServiceCustomNamedNeg(c *Controller, negSvcPorts map[int32]string, ingress bool) *apiv1.Service { + svcAnnotations := map[string]string{} + if len(negSvcPorts) > 0 { + svcAnnotations[annotations.NEGAnnotationKey] = generateCustomNamedNegAnnotation(ingress, negSvcPorts) + } + + // append additional ports if the service does not contain the service port + for port, _ := range negSvcPorts { + exists := false + + for _, svcPort := range ports { + if svcPort.Port == port { + exists = true + break + } + } + + if !exists { + ports = append( + ports, + apiv1.ServicePort{ + Name: fmt.Sprintf("port%v", port), + Port: port, + TargetPort: intstr.FromString(fmt.Sprintf("%v", port)), + }, + ) + } + } + + svc := &apiv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: testServiceName, + Namespace: testServiceNamespace, + Annotations: svcAnnotations, + }, + Spec: apiv1.ServiceSpec{ + Ports: ports, + }, + } + + c.client.CoreV1().Services(testServiceNamespace).Create(context2.TODO(), svc, metav1.CreateOptions{}) + return svc +} + func newTestServiceCus(t *testing.T, c *Controller, namespace, name string, ports []int32) *apiv1.Service { svcPorts := []apiv1.ServicePort{} for _, port := range ports { diff --git a/pkg/neg/manager.go b/pkg/neg/manager.go index d73562524f..e0947c78ef 100644 --- a/pkg/neg/manager.go +++ b/pkg/neg/manager.go @@ -17,22 +17,29 @@ limitations under the License. package neg import ( + "context" "fmt" "reflect" "sync" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + negv1beta1 "k8s.io/ingress-gce/pkg/apis/svcneg/v1beta1" "k8s.io/ingress-gce/pkg/flags" "k8s.io/ingress-gce/pkg/neg/readiness" negsyncer "k8s.io/ingress-gce/pkg/neg/syncers" negtypes "k8s.io/ingress-gce/pkg/neg/types" + svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned" "k8s.io/klog" + utilpointer "k8s.io/utils/pointer" ) type serviceKey struct { @@ -55,6 +62,7 @@ type syncerManager struct { podLister cache.Indexer serviceLister cache.Indexer endpointLister cache.Indexer + svcNegLister cache.Indexer // TODO: lock per service instead of global lock mu sync.Mutex @@ -67,9 +75,11 @@ type syncerManager struct { syncerMap map[negtypes.NegSyncerKey]negtypes.NegSyncer // reflector handles NEG readiness gate and conditions for pods in NEG. reflector readiness.Reflector + //svcNegClient handles lifecycle operations for NEG CRs + svcNegClient svcnegclient.Interface } -func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer, recorder record.EventRecorder, cloud negtypes.NetworkEndpointGroupCloud, zoneGetter negtypes.ZoneGetter, podLister, serviceLister, endpointLister, nodeLister cache.Indexer) *syncerManager { +func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer, recorder record.EventRecorder, cloud negtypes.NetworkEndpointGroupCloud, zoneGetter negtypes.ZoneGetter, svcNegClient svcnegclient.Interface, podLister, serviceLister, endpointLister, nodeLister, svcNegLister cache.Indexer) *syncerManager { return &syncerManager{ namer: namer, recorder: recorder, @@ -79,8 +89,10 @@ func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer, recorder record. podLister: podLister, serviceLister: serviceLister, endpointLister: endpointLister, + svcNegLister: svcNegLister, svcPortMap: make(map[serviceKey]negtypes.PortInfoMap), syncerMap: make(map[negtypes.NegSyncerKey]negtypes.NegSyncer), + svcNegClient: svcNegClient, } } @@ -105,14 +117,19 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg manager.svcPortMap[key] = newPorts klog.V(3).Infof("EnsureSyncer %v/%v: syncing %v ports, removing %v ports, adding %v ports", namespace, name, newPorts, removes, adds) + errList := []error{} for svcPort, portInfo := range removes { syncer, ok := manager.syncerMap[getSyncerKey(namespace, name, svcPort, portInfo)] if ok { syncer.Stop() } + + err := manager.ensureDeleteSvcNegCR(namespace, portInfo.NegName) + if err != nil { + errList = append(errList, err) + } } - errList := []error{} // Ensure a syncer is running for each port that is being added. for svcPort, portInfo := range adds { syncerKey := getSyncerKey(namespace, name, svcPort, portInfo) @@ -121,6 +138,7 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg // determine the implementation that calculates NEG endpoints on each sync. epc := negsyncer.GetEndpointsCalculator(manager.nodeLister, manager.podLister, manager.zoneGetter, syncerKey, portInfo.RandomizeEndpoints) + syncer = negsyncer.NewTransactionSyncer( syncerKey, portInfo.NegName, @@ -137,6 +155,10 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg manager.syncerMap[syncerKey] = syncer } + if err := manager.ensureSvcNegCR(key, portInfo); err != nil { + errList = append(errList, err) + } + if syncer.IsStopped() { if err := syncer.Start(); err != nil { errList = append(errList, err) @@ -159,7 +181,6 @@ func (manager *syncerManager) StopSyncer(namespace, name string) { } delete(manager.svcPortMap, key) } - return } // Sync signals all syncers related to the service to sync. @@ -260,6 +281,29 @@ func (manager *syncerManager) ReadinessGateEnabled(syncerKey negtypes.NegSyncerK return false } +// ensureDeleteSvcNegCR will set the deletion timestamp for the specified NEG CR based +// on the given neg name. If the Deletion timestamp has already been set on the CR, no +// change will occur. +func (manager *syncerManager) ensureDeleteSvcNegCR(namespace, negName string) error { + if manager.svcNegClient == nil { + return nil + } + obj, exists, err := manager.svcNegLister.GetByKey(fmt.Sprintf("%s/%s", namespace, negName)) + if err != nil { + return fmt.Errorf("failed retrieving neg %s/%s to delete: %s", namespace, negName, err) + } + if !exists { + return nil + } + neg := obj.(*negv1beta1.ServiceNetworkEndpointGroup) + + if neg.GetDeletionTimestamp().IsZero() { + err = manager.svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(namespace).Delete(context.Background(), negName, metav1.DeleteOptions{}) + return err + } + return nil +} + // garbageCollectSyncer removes stopped syncer from syncerMap func (manager *syncerManager) garbageCollectSyncer() { manager.mu.Lock() @@ -329,6 +373,98 @@ func (manager *syncerManager) ensureDeleteNetworkEndpointGroup(name, zone string return manager.cloud.DeleteNetworkEndpointGroup(name, zone, meta.VersionGA) } +// ensureSvcNegCR ensures that if neg crd is enabled, a Neg CR exists for every +// desired Neg if it does not already exist. If an NEG CR already exists, and has the required labels +// its Object Meta will be updated if necessary. If the NEG CR does not have required labels an error is thrown. +func (manager *syncerManager) ensureSvcNegCR(svcKey serviceKey, portInfo negtypes.PortInfo) error { + if manager.svcNegClient == nil { + return nil + } + + obj, exists, err := manager.serviceLister.GetByKey(svcKey.Key()) + if err != nil { + klog.Errorf("Failed to retrieve service %s from store: %v", svcKey.Key(), err) + } + + if !exists { + return fmt.Errorf("Service not found") + } + + service := obj.(*v1.Service) + + gvk := schema.GroupVersionKind{Version: "v1", Kind: "Service"} + ownerReference := metav1.NewControllerRef(service, gvk) + ownerReference.BlockOwnerDeletion = utilpointer.BoolPtr(false) + labels := map[string]string{ + negtypes.NegCRManagedByKey: negtypes.NegCRControllerValue, + negtypes.NegCRServiceNameKey: svcKey.name, + negtypes.NegCRServicePortKey: fmt.Sprint(portInfo.PortTuple.Port), + } + + //TODO: Add finalizer after Neg CRD Garbage Collection is implemented. + newCR := negv1beta1.ServiceNetworkEndpointGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: portInfo.NegName, + Namespace: svcKey.namespace, + OwnerReferences: []metav1.OwnerReference{*ownerReference}, + Labels: labels, + }, + } + + negCR, err := manager.svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(svcKey.namespace).Get(context.Background(), portInfo.NegName, metav1.GetOptions{}) + if err != nil { + if !apierrors.IsNotFound(err) { + return fmt.Errorf("Error retrieving existing negs: %s", err) + } + + // Neg does not exist so create it + _, err = manager.svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(svcKey.namespace).Create(context.Background(), &newCR, metav1.CreateOptions{}) + return err + } + + if !negCR.GetDeletionTimestamp().IsZero() { + // Allow GC to complete + return fmt.Errorf("Neg CR already exists with this name and is marked for deletion. Allow GC to complete before recreating.") + } + + needUpdate, err := ensureNegCRLabels(negCR, labels) + if err != nil { + return err + } + needUpdate = ensureNegCROwnerRef(negCR, newCR.OwnerReferences) || needUpdate + + if needUpdate { + newCR.Status = negCR.Status + _, err = manager.svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(svcKey.namespace).Update(context.Background(), &newCR, metav1.UpdateOptions{}) + return err + } + return nil +} + +func ensureNegCRLabels(negCR *negv1beta1.ServiceNetworkEndpointGroup, labels map[string]string) (bool, error) { + //Check that required labels exist and are matching + existingLabels := negCR.GetLabels() + for key, value := range labels { + if existingVal := existingLabels[key]; existingVal != value { + return false, fmt.Errorf("Neg already exists with name %s but label %s has value %s instead of %s. Delete previous neg before creating this configuration", negCR.Name, key, existingVal, value) + } + } + + if !reflect.DeepEqual(existingLabels, labels) { + negCR.Labels = labels + return true, nil + } + return false, nil +} + +func ensureNegCROwnerRef(negCR *negv1beta1.ServiceNetworkEndpointGroup, expectedOwnerRef []metav1.OwnerReference) bool { + if !reflect.DeepEqual(negCR.OwnerReferences, expectedOwnerRef) { + negCR.OwnerReferences = expectedOwnerRef + return true + } + return false +} + // getSyncerKey encodes a service namespace, name, service port and targetPort into a string key func getSyncerKey(namespace, name string, servicePortKey negtypes.PortInfoMapKey, portInfo negtypes.PortInfo) negtypes.NegSyncerKey { networkEndpointType := negtypes.VmIpPortEndpointType diff --git a/pkg/neg/manager_test.go b/pkg/neg/manager_test.go index b500a15df5..8f4e5425d1 100644 --- a/pkg/neg/manager_test.go +++ b/pkg/neg/manager_test.go @@ -18,6 +18,7 @@ package neg import ( context2 "context" + "fmt" "reflect" "testing" "time" @@ -28,16 +29,22 @@ import ( apiv1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + apitypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + negv1beta1 "k8s.io/ingress-gce/pkg/apis/svcneg/v1beta1" backendconfigclient "k8s.io/ingress-gce/pkg/backendconfig/client/clientset/versioned/fake" "k8s.io/ingress-gce/pkg/context" "k8s.io/ingress-gce/pkg/neg/readiness" "k8s.io/ingress-gce/pkg/neg/types" negtypes "k8s.io/ingress-gce/pkg/neg/types" + svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned" + negfake "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned/fake" + namer_util "k8s.io/ingress-gce/pkg/utils/namer" "k8s.io/legacy-cloud-providers/gce" ) @@ -68,24 +75,35 @@ const ( ) func NewTestSyncerManager(kubeClient kubernetes.Interface) *syncerManager { + return NewTestSyncerManagerWithNegClient(kubeClient, nil) +} + +func NewTestSyncerManagerWithNegClient(kubeClient kubernetes.Interface, svcNegClient svcnegclient.Interface) *syncerManager { backendConfigClient := backendconfigclient.NewSimpleClientset() namer := namer_util.NewNamer(ClusterID, "") ctxConfig := context.ControllerContextConfig{ Namespace: apiv1.NamespaceAll, - ResyncPeriod: 1 * time.Second, + ResyncPeriod: 0 * time.Second, DefaultBackendSvcPort: defaultBackend, } - context := context.NewControllerContext(nil, kubeClient, backendConfigClient, nil, nil, gce.NewFakeGCECloud(gce.DefaultTestClusterValues()), namer, "" /*kubeSystemUID*/, ctxConfig) + context := context.NewControllerContext(nil, kubeClient, backendConfigClient, nil, svcNegClient, gce.NewFakeGCECloud(gce.DefaultTestClusterValues()), namer, "" /*kubeSystemUID*/, ctxConfig) + + var svcNegInformer cache.Indexer + if svcNegClient != nil { + svcNegInformer = context.SvcNegInformer.GetIndexer() + } manager := newSyncerManager( namer, record.NewFakeRecorder(100), negtypes.NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network"), negtypes.NewFakeZoneGetter(), + svcNegClient, context.PodInformer.GetIndexer(), context.ServiceInformer.GetIndexer(), context.EndpointInformer.GetIndexer(), context.NodeInformer.GetIndexer(), + svcNegInformer, ) manager.reflector = readiness.NewReadinessReflector(context, manager) return manager @@ -678,6 +696,420 @@ func TestFilterCommonPorts(t *testing.T) { } } +func TestNegCRCreations(t *testing.T) { + t.Parallel() + + svcNegClient := negfake.NewSimpleClientset() + + manager := NewTestSyncerManagerWithNegClient(fake.NewSimpleClientset(), svcNegClient) + namer := manager.namer + + svcName := "n1" + svcNamespace := "ns1" + customNegName := "neg-name" + var svcUID apitypes.UID = "svc-uid" + svc := &v1.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: svcNamespace, + Name: svcName, + }, + } + svc.SetUID(svcUID) + if err := manager.serviceLister.Add(svc); err != nil { + t.Errorf("failed to add sample service to service store: %s", err) + } + + expectedPortInfoMap := negtypes.NewPortInfoMap( + svcNamespace, + svcName, + types.NewSvcPortTupleSet( + negtypes.SvcPortTuple{Port: port1, TargetPort: targetPort1}, + negtypes.SvcPortTuple{Port: port2, TargetPort: targetPort2}), + namer, false, + map[negtypes.SvcPortTuple]string{negtypes.SvcPortTuple{Port: port1, TargetPort: targetPort1}: customNegName}) + + if err := manager.EnsureSyncers(svcNamespace, svcName, expectedPortInfoMap); err != nil { + t.Errorf("failed to ensure syncer %s/%s-%v: %v", svcNamespace, svcName, expectedPortInfoMap, err) + } + + // validate portInfo + if len(manager.svcPortMap) != 1 { + t.Errorf("manager should have one service has %d", len(manager.svcPortMap)) + } + + svcKey := serviceKey{namespace: svcNamespace, name: svcName} + if portInfoMap, svcFound := manager.svcPortMap[svcKey]; svcFound { + for key, expectedInfo := range expectedPortInfoMap { + if info, portFound := portInfoMap[key]; portFound { + if info.NegName != expectedInfo.NegName { + t.Errorf("expected NEG name %q, but got %q", expectedInfo.NegName, info.NegName) + } + + if info.PortTuple.Port == port1 && info.NegName != customNegName { + t.Errorf("expected Neg name for port %d, to be %s, but was %s", port1, customNegName, info.NegName) + } + } else { + t.Errorf("expected port %d of service %q to be registered", key.ServicePort, svcKey.Key()) + } + + neg, err := svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(svcKey.namespace).Get(context2.TODO(), expectedInfo.NegName, metav1.GetOptions{}) + if err != nil { + t.Errorf("error getting neg from neg client: %s", err) + } + checkNegCR(t, neg, svcKey, svcUID, expectedInfo) + } + + } else { + t.Errorf("expect service key %q to be registered", svcKey.Key()) + } + + // Second call of EnsureSyncers shouldn't cause any changes or errors + if err := manager.EnsureSyncers(svcNamespace, svcName, expectedPortInfoMap); err != nil { + t.Errorf("failed to ensure syncer after creating %s/%s-%v: %v", svcNamespace, svcName, expectedPortInfoMap, err) + } + + for _, expectedInfo := range expectedPortInfoMap { + neg, err := svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(svcKey.namespace).Get(context2.TODO(), expectedInfo.NegName, metav1.GetOptions{}) + if err != nil { + t.Errorf("error getting neg from neg client: %s", err) + } + checkNegCR(t, neg, svcKey, svcUID, expectedInfo) + } +} + +func TestNegCRDuplicateCreations(t *testing.T) { + t.Parallel() + + svc1Name := "svc1" + svc2Name := "svc2" + namespace := "ns1" + customNegName := "neg-name" + + var svc1UID apitypes.UID = "svc-1-uid" + var svc2UID apitypes.UID = "svc-2-uid" + svc1 := &v1.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: svc1Name, + }, + } + svc1.SetUID(svc1UID) + + svc2 := svc1.DeepCopy() + svc2.Name = svc2Name + svc2.SetUID(svc2UID) + + svcTuple1 := negtypes.SvcPortTuple{Port: port1, TargetPort: targetPort1} + svcTuple2 := negtypes.SvcPortTuple{Port: port2, TargetPort: targetPort1} + + testCases := []struct { + desc string + svc *v1.Service + svcTuple negtypes.SvcPortTuple + markedForDeletion bool + expectErr bool + modifyObjectMeta bool + crExists bool + }{ + {desc: "no cr exists yet", + crExists: false, + }, + {desc: "same service, same port configuration, original cr is not marked for deletion", + svc: svc1, + svcTuple: svcTuple1, + markedForDeletion: false, + expectErr: false, + crExists: true, + }, + {desc: "same service, same port configuration, original cr is not marked for deletion and has unexpected labels", + svc: svc1, + svcTuple: svcTuple1, + markedForDeletion: false, + expectErr: false, + modifyObjectMeta: true, + crExists: true, + }, + {desc: "same service, same port configuration, original cr is marked for deletion", + svc: svc1, + svcTuple: svcTuple1, + markedForDeletion: true, + expectErr: true, + crExists: true, + }, + {desc: "same service, different port configuration, original cr is not marked for deletion", + svc: svc1, + svcTuple: svcTuple2, + markedForDeletion: false, + expectErr: true, + crExists: true, + }, + {desc: "same service, different port configuration, original cr is marked for deletion", + svc: svc1, + svcTuple: svcTuple2, + markedForDeletion: true, + expectErr: true, + crExists: true, + }, + {desc: "different service name", + svc: svc2, + svcTuple: svcTuple1, + markedForDeletion: false, + expectErr: true, + crExists: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + + svcNegClient := negfake.NewSimpleClientset() + + manager := NewTestSyncerManagerWithNegClient(fake.NewSimpleClientset(), svcNegClient) + namer := manager.namer + + var testNeg negv1beta1.ServiceNetworkEndpointGroup + if tc.crExists { + if err := manager.serviceLister.Add(tc.svc); err != nil { + t.Errorf("failed to add original service to service store: %s", err) + } + svcKey := serviceKey{namespace: namespace, name: tc.svc.Name} + portInfo := negtypes.PortInfo{PortTuple: tc.svcTuple, NegName: customNegName} + testNeg = createNegCR(tc.svc, svcKey, portInfo) + if tc.markedForDeletion { + deletionTS := metav1.Now() + testNeg.SetDeletionTimestamp(&deletionTS) + } + + if tc.modifyObjectMeta { + testNeg.Labels["extra-label"] = "extra-value" + testNeg.OwnerReferences = append(testNeg.OwnerReferences, metav1.OwnerReference{UID: "extra-uid"}) + } + + _, err := svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(namespace1).Create(context2.TODO(), &testNeg, metav1.CreateOptions{}) + if err != nil { + t.Errorf("error creating test neg") + } + } + + if err := manager.serviceLister.Add(svc1); err != nil { + t.Errorf("failed to add sample service to service store: %s", err) + } + + portInfoMap := negtypes.NewPortInfoMap( + namespace, + svc1.Name, + types.NewSvcPortTupleSet(svcTuple1), + namer, false, + map[negtypes.SvcPortTuple]string{svcTuple1: customNegName}, + ) + + err := manager.EnsureSyncers(namespace, svc1.Name, portInfoMap) + if tc.expectErr && err == nil { + t.Errorf("expected error when ensuring syncer %s/%s %+v", namespace, svc1.Name, portInfoMap) + } else if !tc.expectErr && err != nil { + t.Errorf("unexpected error when ensuring syncer %s/%s %+v: %s", namespace, svc1.Name, portInfoMap, err) + } + + negs, err := svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(namespace).List(context2.TODO(), metav1.ListOptions{}) + if len(negs.Items) != 1 { + t.Errorf("expected to retrieve one negs, retrieved %d", len(negs.Items)) + } + + if tc.expectErr || tc.markedForDeletion { + // If errored, marked for deletion or neg cr is already correct, no update should occur + if !reflect.DeepEqual(testNeg, negs.Items[0]) { + t.Errorf("test neg should not have been updated") + } + } else { + svcKey := serviceKey{namespace: namespace, name: svc1Name} + portInfo := portInfoMap[negtypes.PortInfoMapKey{ServicePort: svcTuple1.Port, Subset: ""}] + checkNegCR(t, &negs.Items[0], svcKey, svc1.UID, portInfo) + } + + // make sure there is no leaking go routine + manager.StopSyncer(namespace, svc1Name) + }) + } +} + +func TestNegCRDeletions(t *testing.T) { + t.Parallel() + svcName := "n1" + svcNamespace := "ns1" + customNegName := "neg-name" + svcKey := serviceKey{namespace: svcNamespace, name: svcName} + var svcUID apitypes.UID = "svc-uid" + svc := &v1.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: svcNamespace, + Name: svcName, + }, + } + svc.SetUID(svcUID) + + testCases := []struct { + desc string + negsExist bool + hasDeletionTS bool + }{ + { + desc: "negs exist, no deletion timestamp, remove desired ports", + negsExist: true, + hasDeletionTS: false, + }, + { + desc: "negs already have deletion timestamp, remove desired ports", + negsExist: true, + hasDeletionTS: true, + }, + { + desc: "negs don't exist, remove desired ports", + negsExist: false, + hasDeletionTS: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + svcNegClient := negfake.NewSimpleClientset() + manager := NewTestSyncerManagerWithNegClient(fake.NewSimpleClientset(), svcNegClient) + if err := manager.serviceLister.Add(svc); err != nil { + t.Errorf("failed to add sample service to service store: %s", err) + } + + // set up manager current state before deleting + namer := manager.namer + expectedPortInfoMap := negtypes.NewPortInfoMap( + svcNamespace, + svcName, + types.NewSvcPortTupleSet( + negtypes.SvcPortTuple{Port: port1, TargetPort: targetPort1}, + negtypes.SvcPortTuple{Port: port2, TargetPort: targetPort2}), + namer, false, + map[negtypes.SvcPortTuple]string{negtypes.SvcPortTuple{Port: port1, TargetPort: targetPort1}: customNegName}, + ) + + svcPortMap := map[serviceKey]negtypes.PortInfoMap{ + serviceKey{namespace: svcNamespace, name: svcName}: expectedPortInfoMap, + } + manager.svcPortMap = svcPortMap + + var deletionTS metav1.Time + if tc.hasDeletionTS { + deletionTS = metav1.Now() + } + + if tc.negsExist { + for _, portInfo := range expectedPortInfoMap { + neg := createNegCR(svc, svcKey, portInfo) + if tc.hasDeletionTS { + neg.SetDeletionTimestamp(&deletionTS) + } + manager.svcNegLister.Add(&neg) + if _, err := svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(svcKey.namespace).Create(context2.TODO(), &neg, metav1.CreateOptions{}); err != nil { + t.Errorf("failed adding neg %s to fake neg client: %s", portInfo.NegName, err) + } + } + } + + if err := manager.EnsureSyncers(svcNamespace, svcName, nil); err != nil { + + t.Errorf("unexpected error when deleting negs: %s", err) + } + + negs, err := svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(svcKey.namespace).List(context2.TODO(), metav1.ListOptions{}) + if err != nil { + t.Errorf("error retrieving negs : %s", err) + } + + if tc.hasDeletionTS { + if len(negs.Items) != 2 { + t.Errorf("expected to retrieve two neg, retrieved %d", len(negs.Items)) + } + + for _, neg := range negs.Items { + if tc.hasDeletionTS && !neg.GetDeletionTimestamp().Equal(&deletionTS) { + t.Errorf("Expected neg cr %s deletion timestamp to be unchanged", neg.Name) + } else if !tc.hasDeletionTS && neg.GetDeletionTimestamp().IsZero() { + t.Errorf("Expected neg cr %s deletion timestamp to be set", neg.Name) + } + } + //clear all existing neg CRs + for _, portInfo := range expectedPortInfoMap { + if err = svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(svcKey.namespace).Delete(context2.TODO(), portInfo.NegName, metav1.DeleteOptions{}); err != nil { + t.Errorf("failed deleting neg %s in fake neg client: %s", portInfo.NegName, err) + } + } + } else { + if len(negs.Items) != 0 { + t.Errorf("expected to retrieve zero negs, retrieved %d", len(negs.Items)) + } + } + //ensure all goroutines are stopped + manager.StopSyncer(svcNamespace, svcName) + }) + } +} + +// Check that NEG CR Conditions exist and are in the expected condition +func checkNegCR(t *testing.T, neg *negv1beta1.ServiceNetworkEndpointGroup, svcKey serviceKey, svcUID apitypes.UID, expectedInfo negtypes.PortInfo) { + + if neg.GetNamespace() != svcKey.namespace { + t.Errorf("neg namespace is %s, expected %s", neg.GetNamespace(), svcKey.namespace) + } + + // TODO Add check for finalizer after Neg CRD Garbage Collection is implemented. + + //check labels + labels := neg.GetLabels() + if len(labels) != 3 { + t.Errorf("Expected 3 labels for neg %s, found %d", neg.Name, len(labels)) + } else { + + if val, ok := labels[negtypes.NegCRManagedByKey]; !ok || val != negtypes.NegCRControllerValue { + t.Errorf("Expected neg to have label %s, with value %s found %s", negtypes.NegCRManagedByKey, negtypes.NegCRControllerValue, val) + } + + if val, ok := labels[negtypes.NegCRServiceNameKey]; !ok || val != svcKey.name { + t.Errorf("Expected neg to have label %s, with value %s found %s", negtypes.NegCRServiceNameKey, expectedInfo.NegName, val) + } + + if val, ok := labels[negtypes.NegCRServicePortKey]; !ok || val != fmt.Sprint(expectedInfo.PortTuple.Port) { + t.Errorf("Expected neg to have label %s, with value %d found %s", negtypes.NegCRServicePortKey, expectedInfo.PortTuple.Port, val) + } + } + + ownerRefs := neg.GetOwnerReferences() + if len(ownerRefs) != 1 { + t.Errorf("Expected neg to have one owner ref, has %d", len(ownerRefs)) + } else { + + if ownerRefs[0].Name != svcKey.name { + t.Errorf("Expected neg owner ref to have name %s, instead has %s", svcKey.name, ownerRefs[0].Name) + } + + if ownerRefs[0].UID != svcUID { + t.Errorf("Expected neg owner ref to have UID %s, instead has %s", svcUID, ownerRefs[0].UID) + } + + if *ownerRefs[0].BlockOwnerDeletion != false { + t.Errorf("Expected neg owner ref not block owner deltion") + } + } +} + // populateSyncerManager for testing func populateSyncerManager(manager *syncerManager, kubeClient kubernetes.Interface) { namer := manager.namer @@ -822,3 +1254,24 @@ func portInfoUnion(p1, p2 negtypes.PortInfoMap) negtypes.PortInfoMap { p1.Merge(p2) return p1 } + +func createNegCR(service *v1.Service, svcKey serviceKey, portInfo negtypes.PortInfo) negv1beta1.ServiceNetworkEndpointGroup { + ownerReference := metav1.NewControllerRef(service, service.GroupVersionKind()) + *ownerReference.BlockOwnerDeletion = false + labels := map[string]string{ + negtypes.NegCRManagedByKey: negtypes.NegCRControllerValue, + negtypes.NegCRServiceNameKey: svcKey.name, + negtypes.NegCRServicePortKey: fmt.Sprint(portInfo.PortTuple.Port), + } + + // TODO: Add finalizer once NEG CRD GC is implemented + return negv1beta1.ServiceNetworkEndpointGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: portInfo.NegName, + Namespace: svcKey.namespace, + OwnerReferences: []metav1.OwnerReference{*ownerReference}, + Labels: labels, + ResourceVersion: "rv", + }, + } +} diff --git a/pkg/neg/types/interfaces.go b/pkg/neg/types/interfaces.go index 452354be83..e533a060a2 100644 --- a/pkg/neg/types/interfaces.go +++ b/pkg/neg/types/interfaces.go @@ -18,7 +18,7 @@ package types import ( "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/ingress-gce/pkg/composite" ) diff --git a/pkg/neg/types/types.go b/pkg/neg/types/types.go index c71c7ffb7a..b1efdbb5a7 100644 --- a/pkg/neg/types/types.go +++ b/pkg/neg/types/types.go @@ -41,6 +41,14 @@ const ( L7Mode = EndpointsCalculatorMode("L7") L4LocalMode = EndpointsCalculatorMode("L4, ExternalTrafficPolicy:Local") L4ClusterMode = EndpointsCalculatorMode("L4, ExternalTrafficPolicy:Cluster") + + // These keys are to be used as label keys for NEG CRs when enabled + + NegCRManagedByKey = "networking.gke.io/managed-by" + NegCRServiceNameKey = "networking.gke.io/service-name" + NegCRServicePortKey = "networking.gke.io/service-port" + // NegCRControllerValue is used as the value for the managed-by label on NEG CRs when enabled. + NegCRControllerValue = "neg-controller" ) // SvcPortTuple is the tuple representing one service port