From 174a2748ab0654e3f24613270e0f83d82c7dfd61 Mon Sep 17 00:00:00 2001 From: Swetha Repakula Date: Mon, 6 Jul 2020 16:53:19 -0700 Subject: [PATCH] Create Neg CRs when neg crd is enabled - when EnableNegCrd flag is set, neg client is created - manager will create neg crs when creating syncers - manager will delete neg crs when deleting syncers --- cmd/glbc/main.go | 12 +- pkg/neg/controller.go | 30 +- pkg/neg/controller_test.go | 281 ++++++++++++++++++- pkg/neg/manager.go | 172 +++++++++++- pkg/neg/manager_test.go | 540 +++++++++++++++++++++++++++++++++++- pkg/neg/types/interfaces.go | 4 +- pkg/neg/types/types.go | 8 + 7 files changed, 1018 insertions(+), 29 deletions(-) diff --git a/cmd/glbc/main.go b/cmd/glbc/main.go index 8172b1bacd..f79981fdec 100644 --- a/cmd/glbc/main.go +++ b/cmd/glbc/main.go @@ -38,6 +38,7 @@ import ( "k8s.io/client-go/tools/record" backendconfigclient "k8s.io/ingress-gce/pkg/backendconfig/client/clientset/versioned" frontendconfigclient "k8s.io/ingress-gce/pkg/frontendconfig/client/clientset/versioned" + svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned" ingctx "k8s.io/ingress-gce/pkg/context" "k8s.io/ingress-gce/pkg/controller" @@ -126,11 +127,18 @@ func main() { } } + var svcNegClient svcnegclient.Interface if flags.F.EnableNegCrd { negCRDMeta := svcneg.CRDMeta() if _, err := crdHandler.EnsureCRD(negCRDMeta); err != nil { klog.Fatalf("Failed to ensure ServiceNetworkEndpointGroup CRD: %v", err) } + + svcNegClient, err = svcnegclient.NewForConfig(kubeConfig) + if err != nil { + klog.Fatalf("Failed to create NetworkEndpointGroup client: %v", err) + } + } namer, err := app.NewNamer(kubeClient, flags.F.ClusterName, firewalls.DefaultFirewallName) @@ -160,7 +168,7 @@ func main() { ASMConfigMapNamespace: flags.F.ASMConfigMapBasedConfigNamespace, ASMConfigMapName: flags.F.ASMConfigMapBasedConfigCMName, } - ctx := ingctx.NewControllerContext(kubeConfig, kubeClient, backendConfigClient, frontendConfigClient, nil, cloud, namer, kubeSystemUID, ctxConfig) + ctx := ingctx.NewControllerContext(kubeConfig, kubeClient, backendConfigClient, frontendConfigClient, svcNegClient, cloud, namer, kubeSystemUID, ctxConfig) go app.RunHTTPServer(ctx.HealthCheck) if !flags.F.LeaderElection.LeaderElect { @@ -253,7 +261,7 @@ func runControllers(ctx *ingctx.ControllerContext) { } // TODO: Refactor NEG to use cloud mocks so ctx.Cloud can be referenced within NewController. - negController := neg.NewController(negtypes.NewAdapter(ctx.Cloud), ctx, zoneGetter, ctx.ClusterNamer, flags.F.ResyncPeriod, flags.F.NegGCPeriod, flags.F.EnableReadinessReflector, flags.F.RunIngressController, flags.F.RunL4Controller) + negController := neg.NewController(negtypes.NewAdapter(ctx.Cloud), ctx, zoneGetter, ctx.ClusterNamer, flags.F.ResyncPeriod, flags.F.NegGCPeriod, flags.F.EnableReadinessReflector, flags.F.RunIngressController, flags.F.RunL4Controller, flags.F.EnableNegCrd) go negController.Run(stopCh) klog.V(0).Infof("negController started") diff --git a/pkg/neg/controller.go b/pkg/neg/controller.go index f2a8f01da9..701846dcdc 100644 --- a/pkg/neg/controller.go +++ b/pkg/neg/controller.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" apimachinerytypes "k8s.io/apimachinery/pkg/types" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/dynamic" @@ -99,6 +100,9 @@ type Controller struct { // runL4 indicates whether to run NEG controller that processes L4 ILB services runL4 bool + + // indicates whether neg crd have been enabled + enableNegCrd bool } // NewController returns a network endpoint group controller. @@ -112,6 +116,7 @@ func NewController( enableReadinessReflector bool, runIngress bool, runL4Controller bool, + enableNegCrd bool, ) *Controller { // init event recorder // TODO: move event recorder initializer to main. Reuse it among controllers. @@ -123,7 +128,7 @@ func NewController( recorder := eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{Component: "neg-controller"}) - manager := newSyncerManager(namer, recorder, cloud, zoneGetter, ctx.PodInformer.GetIndexer(), ctx.ServiceInformer.GetIndexer(), ctx.EndpointInformer.GetIndexer(), ctx.NodeInformer.GetIndexer()) + manager := newSyncerManager(namer, recorder, cloud, zoneGetter, ctx.NegClient, ctx.PodInformer.GetIndexer(), ctx.ServiceInformer.GetIndexer(), ctx.EndpointInformer.GetIndexer(), ctx.NodeInformer.GetIndexer()) var reflector readiness.Reflector if enableReadinessReflector { reflector = readiness.NewReadinessReflector(ctx, manager) @@ -151,6 +156,7 @@ func NewController( reflector: reflector, collector: ctx.ControllerMetrics, runL4: runL4Controller, + enableNegCrd: enableNegCrd, } if runIngress { @@ -376,8 +382,8 @@ func (c *Controller) processService(key string) error { } if !exists { c.collector.DeleteNegService(key) - c.manager.StopSyncer(namespace, name) - return nil + + return c.manager.StopSyncer(namespace, name) } service := obj.(*apiv1.Service) @@ -431,9 +437,15 @@ func (c *Controller) processService(key string) error { klog.V(4).Infof("Service %q does not need any NEG. Skipping", key) c.collector.DeleteNegService(key) // neg annotation is not found or NEG is not enabled - c.manager.StopSyncer(namespace, name) + var errList []error + if err = c.manager.StopSyncer(namespace, name); err != nil { + errList = append(errList, err) + } // delete the annotation - return c.syncNegStatusAnnotation(namespace, name, make(negtypes.PortInfoMap)) + if err = c.syncNegStatusAnnotation(namespace, name, make(negtypes.PortInfoMap)); err != nil { + errList = append(errList, err) + } + return utilerrors.NewAggregate(errList) } // mergeIngressPortInfo merges Ingress PortInfo into portInfoMap if the service has Enable Ingress annotation. @@ -481,12 +493,16 @@ func (c *Controller) mergeStandaloneNEGsPortInfo(service *apiv1.Service, name ty ) } - exposedNegSvcPort, _, err := negServicePorts(negAnnotation, knowSvcPortSet) + exposedNegSvcPort, customNames, err := negServicePorts(negAnnotation, knowSvcPortSet) if err != nil { return err } - if err := portInfoMap.Merge(negtypes.NewPortInfoMap(name.Namespace, name.Name, exposedNegSvcPort, c.namer /*readinessGate*/, true, nil)); err != nil { + if !c.enableNegCrd { + customNames = nil + } + + if err := portInfoMap.Merge(negtypes.NewPortInfoMap(name.Namespace, name.Name, exposedNegSvcPort, c.namer /*readinessGate*/, true, customNames)); err != nil { return fmt.Errorf("failed to merge service ports exposed as standalone NEGs (%v) into ingress referenced service ports (%v): %v", exposedNegSvcPort, portInfoMap, err) } } diff --git a/pkg/neg/controller_test.go b/pkg/neg/controller_test.go index 882b396ccd..bead6db762 100644 --- a/pkg/neg/controller_test.go +++ b/pkg/neg/controller_test.go @@ -33,6 +33,7 @@ import ( istioV1alpha3 "istio.io/api/networking/v1alpha3" apiv1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/api/networking/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -49,6 +50,8 @@ import ( "k8s.io/ingress-gce/pkg/context" "k8s.io/ingress-gce/pkg/flags" negtypes "k8s.io/ingress-gce/pkg/neg/types" + svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned" + negfake "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned/fake" "k8s.io/ingress-gce/pkg/utils" namer_util "k8s.io/ingress-gce/pkg/utils/namer" "k8s.io/legacy-cloud-providers/gce" @@ -75,6 +78,10 @@ var ( ) func newTestController(kubeClient kubernetes.Interface) *Controller { + return newTestControllerWithNegClient(kubeClient, nil) +} + +func newTestControllerWithNegClient(kubeClient kubernetes.Interface, negClient svcnegclient.Interface) *Controller { backendConfigClient := backendconfigclient.NewSimpleClientset() namer := namer_util.NewNamer(ClusterID, "") dynamicSchema := runtime.NewScheme() @@ -90,7 +97,7 @@ func newTestController(kubeClient kubernetes.Interface) *Controller { ASMConfigMapNamespace: "kube-system", ASMConfigMapName: "ingress-controller-config-test", } - context := context.NewControllerContext(nil, kubeClient, backendConfigClient, nil, nil, gce.NewFakeGCECloud(gce.DefaultTestClusterValues()), namer, "" /*kubeSystemUID*/, ctxConfig) + context := context.NewControllerContext(nil, kubeClient, backendConfigClient, nil, negClient, gce.NewFakeGCECloud(gce.DefaultTestClusterValues()), namer, "" /*kubeSystemUID*/, ctxConfig) // Hack the context.Init func. configMapInformer := informerv1.NewConfigMapInformer(kubeClient, context.Namespace, context.ResyncPeriod, utils.NewNamespaceIndexer()) @@ -105,6 +112,11 @@ func newTestController(kubeClient kubernetes.Interface) *Controller { context.DestinationRuleInformer = drDynamicInformer.Informer() context.DestinationRuleClient = dynamicClient.Resource(destrinationGVR) + enableNegCrd := false + if negClient != nil { + enableNegCrd = true + } + controller := NewController( negtypes.NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network"), context, @@ -116,6 +128,7 @@ func newTestController(kubeClient kubernetes.Interface) *Controller { false, true, false, + enableNegCrd, ) return controller } @@ -870,6 +883,161 @@ func TestMergeCSMPortInfoMap(t *testing.T) { } } +func TestEnableNegCRD(t *testing.T) { + t.Parallel() + + testCases := []struct { + desc string + exposedPortNames map[int32]string + expectNegPorts []int32 + negClient svcnegclient.Interface + }{ + { + "No ingress, multiple ports all custom names", + map[int32]string{80: "neg-1", 443: "neg-2", 8081: "neg-3", 8080: "neg-4"}, + []int32{80, 443, 8081, 8080}, + negfake.NewSimpleClientset(), + }, + { + "No ingress, multiple ports, mix of custom names", + map[int32]string{80: "neg-1", 443: "", 8081: "neg-3"}, + []int32{80, 443, 8081}, + negfake.NewSimpleClientset(), + }, + { + "No ingress, one port, custom name", + map[int32]string{80: "neg-1"}, + []int32{80}, + negfake.NewSimpleClientset(), + }, + { + "No ingress, one port, custom name, neg crd is not enabled", + map[int32]string{80: "neg-1"}, + []int32{80}, + nil, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + controller := newTestControllerWithNegClient(fake.NewSimpleClientset(), tc.negClient) + defer controller.stop() + svcKey := utils.ServiceKeyFunc(testServiceNamespace, testServiceName) + service := newTestServiceCustomNamedNeg(controller, tc.exposedPortNames) + controller.serviceLister.Add(service) + + err := controller.processService(svcKey) + if err != nil { + t.Fatalf("Failed to process service: %v", err) + } + + expectedSyncers := len(tc.exposedPortNames) + validateSyncers(t, controller, expectedSyncers, false) + svcClient := controller.client.CoreV1().Services(testServiceNamespace) + svc, err := svcClient.Get(context2.TODO(), testServiceName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Service was not created successfully, err: %v", err) + } + if tc.negClient != nil { + validateServiceStateAnnotationWithPortNameMap(t, svc, tc.expectNegPorts, controller.namer, tc.exposedPortNames) + validateNegCRs(t, svc, tc.negClient, controller.namer, tc.exposedPortNames) + } else { + validateServiceStateAnnotation(t, svc, tc.expectNegPorts, controller.namer) + } + controller.serviceLister.Delete(service) + err = controller.processService(svcKey) + if err != nil { + t.Fatalf("Failed to process service: %v", err) + } + + validateSyncers(t, controller, expectedSyncers, true) + if tc.negClient != nil { + validateNegCRsDeletionTS(t, tc.negClient, controller.namer, tc.exposedPortNames, testServiceNamespace, testServiceName) + } + }) + } +} + +func TestEnableNegCRDTransitions(t *testing.T) { + t.Parallel() + + exposedPortNames := map[int32]string{80: "neg-1", 443: "", 8081: "", 8080: ""} + expectNegPorts := []int32{80, 443, 8081, 8080} + + negClient := negfake.NewSimpleClientset() + controller := newTestControllerWithNegClient(fake.NewSimpleClientset(), negClient) + defer controller.stop() + svcKey := utils.ServiceKeyFunc(testServiceNamespace, testServiceName) + service := newTestServiceCustomNamedNeg(controller, exposedPortNames) + controller.serviceLister.Add(service) + + err := controller.processService(svcKey) + if err != nil { + t.Fatalf("Failed to process service: %v", err) + } + + expectedSyncers := len(exposedPortNames) + validateSyncers(t, controller, expectedSyncers, false) + svcClient := controller.client.CoreV1().Services(testServiceNamespace) + svc, err := svcClient.Get(context2.TODO(), testServiceName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Service was not created successfully, err: %v", err) + } + validateServiceStateAnnotationWithPortNameMap(t, svc, expectNegPorts, controller.namer, exposedPortNames) + validateNegCRs(t, svc, negClient, controller.namer, exposedPortNames) + + service = newTestServiceCustomNamedNeg(controller, map[int32]string{}) + controller.serviceLister.Update(service) + err = controller.processService(svcKey) + if err != nil { + t.Fatalf("Failed to process service: %v", err) + } + validateSyncers(t, controller, expectedSyncers, true) + validateNegCRsDeletionTS(t, negClient, controller.namer, exposedPortNames, testServiceNamespace, testServiceName) +} + +func validateNegCRsDeletionTS(t *testing.T, negClient svcnegclient.Interface, namer negtypes.NetworkEndpointGroupNamer, negPortNameMap map[int32]string, namespace, svcName string) { + + for port, expectedName := range negPortNameMap { + name := expectedName + if name == "" { + name = namer.NEG(namespace, svcName, port) + } + neg, err := negClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(namespace).Get(context2.TODO(), name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("neg cr was not created successfully: err: %s", err) + } + + if neg.GetDeletionTimestamp().IsZero() { + t.Fatalf("neg cr deletion timestamp has not be set. namespace:%s service-name:%s neg-name:%s neg-port:%d", namespace, svcName, name, port) + } + + } +} + +func validateNegCRs(t *testing.T, svc *v1.Service, negClient svcnegclient.Interface, namer negtypes.NetworkEndpointGroupNamer, negPortNameMap map[int32]string) { + + for port, expectedName := range negPortNameMap { + name := expectedName + if name == "" { + name = namer.NEG(svc.Namespace, svc.Name, port) + } + neg, err := negClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(svc.Namespace).Get(context2.TODO(), name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("neg cr was not created successfully: err: %s", err) + } + + ownerReferences := neg.GetOwnerReferences() + if len(ownerReferences) != 1 { + t.Fatalf("neg cr should only have 1 owner reference, has %d", len(ownerReferences)) + } + + if ownerReferences[0].UID != svc.UID { + t.Fatalf("neg cr owner reference does not point to service %s/%s", svc.Namespace, svc.Name) + } + } +} + func validateSyncers(t *testing.T, controller *Controller, num int, stopped bool) { t.Helper() if len(controller.manager.(*syncerManager).syncerMap) != num { @@ -948,14 +1116,51 @@ func validateDestinationRuleAnnotationWithPortInfoMap(t *testing.T, usdr *unstru } } +// validateServiceStateAnnotationWithPortNameMap validates all aspects of the service annotation +// and also checks for custon names if specified in given portNameMap +func validateServiceStateAnnotationWithPortNameMap(t *testing.T, svc *apiv1.Service, svcPorts []int32, namer negtypes.NetworkEndpointGroupNamer, portNameMap map[int32]string) { + + negStatus := validateServiceStateAnnotationExceptNames(t, svc, svcPorts) + for svcPort, name := range portNameMap { + negName, ok := negStatus.NetworkEndpointGroups[strconv.Itoa(int(svcPort))] + if !ok { + t.Fatalf("NEG for port %d was not found", svcPort) + } + var expectName = name + if name == "" { + expectName = namer.NEG(svc.Namespace, svc.Name, svcPort) + } + if negName != expectName { + t.Fatalf("Expect NEG name of service port %d to be %q, but got %q", svcPort, expectName, negName) + } + } +} + func validateServiceStateAnnotation(t *testing.T, svc *apiv1.Service, svcPorts []int32, namer negtypes.NetworkEndpointGroupNamer) { t.Helper() + + negStatus := validateServiceStateAnnotationExceptNames(t, svc, svcPorts) + for _, svcPort := range svcPorts { + negName, ok := negStatus.NetworkEndpointGroups[strconv.Itoa(int(svcPort))] + if !ok { + t.Fatalf("NEG for port %d was not found", svcPort) + } + expectName := namer.NEG(svc.Namespace, svc.Name, svcPort) + if negName != expectName { + t.Fatalf("Expect NEG name of service port %d to be %q, but got %q", svcPort, expectName, negName) + } + } +} + +// validateServiceStateAnnotationExceptNames will validate all aspects of the status annotation except for the name +func validateServiceStateAnnotationExceptNames(t *testing.T, svc *apiv1.Service, svcPorts []int32) annotations.NegStatus { + t.Helper() if len(svcPorts) == 0 { v, ok := svc.Annotations[annotations.NEGStatusKey] if ok { t.Fatalf("Expected no NEG service state annotation when there are no servicePorts, got: %v", v) } - return + return annotations.NegStatus{} } v, ok := svc.Annotations[annotations.NEGStatusKey] @@ -988,23 +1193,13 @@ func validateServiceStateAnnotation(t *testing.T, svc *apiv1.Service, svcPorts [ t.Fatalf("Expect # of NEG to be %d, but got %d", len(svcPorts), len(negStatus.NetworkEndpointGroups)) } - for _, svcPort := range svcPorts { - negName, ok := negStatus.NetworkEndpointGroups[strconv.Itoa(int(svcPort))] - if !ok { - t.Fatalf("NEG for port %d was not found", svcPort) - } - expectName := namer.NEG(svc.Namespace, svc.Name, svcPort) - if negName != expectName { - t.Fatalf("Expect NEG name of service port %d to be %q, but got %q", svcPort, expectName, negName) - } - } - zoneInStatus := sets.NewString(negStatus.Zones...) expectedZones := sets.NewString(zones...) if !zoneInStatus.Equal(expectedZones) { t.Fatalf("Expect Zone %v, but got %v", expectedZones.List(), zoneInStatus.List()) } + return negStatus } func generateNegAnnotation(ingress bool, svcPorts []int32) string { @@ -1020,6 +1215,22 @@ func generateNegAnnotation(ingress bool, svcPorts []int32) string { return string(formattedAnnotation) } +func generateCustomNamedNegAnnotation(svcPorts map[int32]string) string { + var annotation annotations.NegAnnotation + enabledPorts := make(map[int32]annotations.NegAttributes) + for port, name := range svcPorts { + if name != "" { + enabledPorts[port] = annotations.NegAttributes{Name: name} + } else { + enabledPorts[port] = annotations.NegAttributes{} + } + } + + annotation.ExposedPorts = enabledPorts + formattedAnnotation, _ := json.Marshal(annotation) + return string(formattedAnnotation) +} + func newTestIngress(name string) *v1beta1.Ingress { return &v1beta1.Ingress{ ObjectMeta: metav1.ObjectMeta{ @@ -1170,6 +1381,50 @@ func newTestService(c *Controller, negIngress bool, negSvcPorts []int32) *apiv1. return svc } +func newTestServiceCustomNamedNeg(c *Controller, negSvcPorts map[int32]string) *apiv1.Service { + svcAnnotations := map[string]string{} + if len(negSvcPorts) > 0 { + svcAnnotations[annotations.NEGAnnotationKey] = generateCustomNamedNegAnnotation(negSvcPorts) + } + + // append additional ports if the service does not contain the service port + for port, _ := range negSvcPorts { + exists := false + + for _, svcPort := range ports { + if svcPort.Port == port { + exists = true + break + } + } + + if !exists { + ports = append( + ports, + apiv1.ServicePort{ + Name: fmt.Sprintf("port%v", port), + Port: port, + TargetPort: intstr.FromString(fmt.Sprintf("%v", port)), + }, + ) + } + } + + svc := &apiv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: testServiceName, + Namespace: testServiceNamespace, + Annotations: svcAnnotations, + }, + Spec: apiv1.ServiceSpec{ + Ports: ports, + }, + } + + c.client.CoreV1().Services(testServiceNamespace).Create(context2.TODO(), svc, metav1.CreateOptions{}) + return svc +} + func newTestServiceCus(t *testing.T, c *Controller, namespace, name string, ports []int32) *apiv1.Service { svcPorts := []apiv1.ServicePort{} for _, port := range ports { diff --git a/pkg/neg/manager.go b/pkg/neg/manager.go index d73562524f..720f0f19b8 100644 --- a/pkg/neg/manager.go +++ b/pkg/neg/manager.go @@ -17,22 +17,29 @@ limitations under the License. package neg import ( + "context" "fmt" "reflect" + "strconv" "sync" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + negv1beta1 "k8s.io/ingress-gce/pkg/apis/svcneg/v1beta1" "k8s.io/ingress-gce/pkg/flags" "k8s.io/ingress-gce/pkg/neg/readiness" negsyncer "k8s.io/ingress-gce/pkg/neg/syncers" negtypes "k8s.io/ingress-gce/pkg/neg/types" + svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned" "k8s.io/klog" + "k8s.io/kubernetes/pkg/apis/core" ) type serviceKey struct { @@ -67,9 +74,11 @@ type syncerManager struct { syncerMap map[negtypes.NegSyncerKey]negtypes.NegSyncer // reflector handles NEG readiness gate and conditions for pods in NEG. reflector readiness.Reflector + //negClient handles lifecycle operations for NEG CRs + negClient svcnegclient.Interface } -func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer, recorder record.EventRecorder, cloud negtypes.NetworkEndpointGroupCloud, zoneGetter negtypes.ZoneGetter, podLister, serviceLister, endpointLister, nodeLister cache.Indexer) *syncerManager { +func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer, recorder record.EventRecorder, cloud negtypes.NetworkEndpointGroupCloud, zoneGetter negtypes.ZoneGetter, negClient svcnegclient.Interface, podLister, serviceLister, endpointLister, nodeLister cache.Indexer) *syncerManager { return &syncerManager{ namer: namer, recorder: recorder, @@ -81,6 +90,7 @@ func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer, recorder record. endpointLister: endpointLister, svcPortMap: make(map[serviceKey]negtypes.PortInfoMap), syncerMap: make(map[negtypes.NegSyncerKey]negtypes.NegSyncer), + negClient: negClient, } } @@ -121,6 +131,11 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg // determine the implementation that calculates NEG endpoints on each sync. epc := negsyncer.GetEndpointsCalculator(manager.nodeLister, manager.podLister, manager.zoneGetter, syncerKey, portInfo.RandomizeEndpoints) + + if err := manager.ensureCreateNegCR(key, portInfo); err != nil { + errList = append(errList, err) + } + syncer = negsyncer.NewTransactionSyncer( syncerKey, portInfo.NegName, @@ -147,7 +162,7 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg } // StopSyncer stops all syncers for the input service. -func (manager *syncerManager) StopSyncer(namespace, name string) { +func (manager *syncerManager) StopSyncer(namespace, name string) error { manager.mu.Lock() defer manager.mu.Unlock() key := getServiceKey(namespace, name) @@ -159,7 +174,7 @@ func (manager *syncerManager) StopSyncer(namespace, name string) { } delete(manager.svcPortMap, key) } - return + return manager.ensureDeleteNegServiceCRs(namespace, name) } // Sync signals all syncers related to the service to sync. @@ -260,6 +275,32 @@ func (manager *syncerManager) ReadinessGateEnabled(syncerKey negtypes.NegSyncerK return false } +func (manager *syncerManager) ensureDeleteNegServiceCRs(namespace, name string) error { + if manager.negClient == nil { + return nil + } + + labelSelector := labels.Set(map[string]string{negtypes.NegCRServiceNameKey: name}).String() + negs, err := manager.negClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(namespace).List(context.Background(), metav1.ListOptions{LabelSelector: labelSelector}) + if err != nil { + klog.Errorf("failed to retrieve neg crs for service %s/%s to delete", namespace, name) + return err + } + + errList := []error{} + for _, neg := range negs.Items { + deletionTS := metav1.Now() + if neg.GetDeletionTimestamp().IsZero() { + neg.SetDeletionTimestamp(&deletionTS) + _, err := manager.negClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(namespace).Update(context.Background(), &neg, metav1.UpdateOptions{}) + if err != nil { + errList = append(errList, err) + } + } + } + return utilerrors.NewAggregate(errList) +} + // garbageCollectSyncer removes stopped syncer from syncerMap func (manager *syncerManager) garbageCollectSyncer() { manager.mu.Lock() @@ -329,6 +370,129 @@ func (manager *syncerManager) ensureDeleteNetworkEndpointGroup(name, zone string return manager.cloud.DeleteNetworkEndpointGroup(name, zone, meta.VersionGA) } +func (manager *syncerManager) ensureCreateNegCR(svcKey serviceKey, portInfo negtypes.PortInfo) error { + if manager.negClient == nil { + return nil + } + + obj, exists, err := manager.serviceLister.GetByKey(svcKey.Key()) + if err != nil { + klog.Errorf("Failed to retrieve service %s from store: %v", svcKey.Key(), err) + } + + if !exists { + return fmt.Errorf("Service not found") + } + + service := obj.(*v1.Service) + + createCR, err := manager.checkForExistingNegCR(service, portInfo) + if err != nil { + return fmt.Errorf("Failed checking for existing CR: %s", err) + } + if createCR { + ownerReference := metav1.NewControllerRef(service, service.GroupVersionKind()) + *ownerReference.BlockOwnerDeletion = false + labels := map[string]string{ + negtypes.NegCRManagedByKey: negtypes.NegCRControllerValue, + negtypes.NegCRServiceNameKey: svcKey.name, + negtypes.NegCRServicePortKey: fmt.Sprint(portInfo.PortTuple.Port), + } + + //TODO: Add finalizer after Neg CRD Garbage Collection is implemented. + now := metav1.Now() + negCR := negv1beta1.ServiceNetworkEndpointGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: portInfo.NegName, + Namespace: svcKey.namespace, + OwnerReferences: []metav1.OwnerReference{*ownerReference}, + Labels: labels, + }, + Status: negv1beta1.ServiceNetworkEndpointGroupStatus{ + Conditions: []negv1beta1.Condition{ + negv1beta1.Condition{ + Type: negv1beta1.Initialized, + Status: core.ConditionUnknown, + LastTransitionTime: now, + Reason: "NEG CR created", + }, + negv1beta1.Condition{ + Type: negv1beta1.Synced, + Status: core.ConditionUnknown, + LastTransitionTime: now, + Reason: "NEG CR created", + }, + }, + }, + } + _, err = manager.negClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(svcKey.namespace).Create(context.Background(), &negCR, metav1.CreateOptions{}) + return err + } + + return nil +} + +func (manager *syncerManager) checkForExistingNegCR(svc *v1.Service, portInfo negtypes.PortInfo) (bool, error) { + + negCR, err := manager.negClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(svc.Namespace).Get(context.Background(), portInfo.NegName, metav1.GetOptions{}) + if err != nil { + if !apierrors.IsNotFound(err) { + return false, fmt.Errorf("Error retrieving existing negs: %s", err) + } else { + return true, nil + } + } + + if !negCR.GetDeletionTimestamp().IsZero() { + // If Neg CR is already marked for deletion, allow GC to complete before creating new CR + return false, nil + } + + deletionTS := metav1.Now() + ownerReferences := negCR.GetOwnerReferences() + if len(ownerReferences) == 0 || len(ownerReferences) > 1 { + negCR.SetDeletionTimestamp(&deletionTS) + _, err := manager.negClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(svc.Namespace).Update(context.Background(), negCR, metav1.UpdateOptions{}) + return false, err + } + labels := negCR.GetLabels() + p, err := strconv.ParseInt(labels[negtypes.NegCRServicePortKey], 10, 32) + if err != nil { + // Invalid NEG CR, delete CR. + return true, manager.negClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(svc.Namespace).Delete(context.Background(), portInfo.NegName, metav1.DeleteOptions{}) + } + svcPort := int32(p) + + if ownerReferences[0].UID != svc.UID { + svcKey := getServiceKey(negCR.Namespace, ownerReferences[0].Name) + _, exists, err := manager.serviceLister.GetByKey(svcKey.Key()) + if err != nil { + return false, err + } + + if exists { + if currentPorts, ok := manager.svcPortMap[svcKey]; ok { + for key, portInfo := range currentPorts { + if key.ServicePort == svcPort { + return false, fmt.Errorf("Neg with name %s already exists", portInfo.NegName) + } + } + } + } + negCR.SetDeletionTimestamp(&deletionTS) + _, err = manager.negClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(svc.Namespace).Update(context.Background(), negCR, metav1.UpdateOptions{}) + return false, err + } + + if svcPort != portInfo.PortTuple.Port { + negCR.SetDeletionTimestamp(&deletionTS) + _, err := manager.negClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(svc.Namespace).Update(context.Background(), negCR, metav1.UpdateOptions{}) + return false, err + } + + return false, nil +} + // getSyncerKey encodes a service namespace, name, service port and targetPort into a string key func getSyncerKey(namespace, name string, servicePortKey negtypes.PortInfoMapKey, portInfo negtypes.PortInfo) negtypes.NegSyncerKey { networkEndpointType := negtypes.VmIpPortEndpointType diff --git a/pkg/neg/manager_test.go b/pkg/neg/manager_test.go index b500a15df5..db9f23aa74 100644 --- a/pkg/neg/manager_test.go +++ b/pkg/neg/manager_test.go @@ -18,26 +18,33 @@ package neg import ( context2 "context" + "fmt" "reflect" "testing" "time" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" "k8s.io/ingress-gce/pkg/composite" + "k8s.io/kubernetes/pkg/apis/core" apiv1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + apitypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/record" + negv1beta1 "k8s.io/ingress-gce/pkg/apis/svcneg/v1beta1" backendconfigclient "k8s.io/ingress-gce/pkg/backendconfig/client/clientset/versioned/fake" "k8s.io/ingress-gce/pkg/context" "k8s.io/ingress-gce/pkg/neg/readiness" "k8s.io/ingress-gce/pkg/neg/types" negtypes "k8s.io/ingress-gce/pkg/neg/types" + svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned" + negfake "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned/fake" + namer_util "k8s.io/ingress-gce/pkg/utils/namer" "k8s.io/legacy-cloud-providers/gce" ) @@ -68,6 +75,10 @@ const ( ) func NewTestSyncerManager(kubeClient kubernetes.Interface) *syncerManager { + return NewTestSyncerManagerWithNegClient(kubeClient, nil) +} + +func NewTestSyncerManagerWithNegClient(kubeClient kubernetes.Interface, negClient svcnegclient.Interface) *syncerManager { backendConfigClient := backendconfigclient.NewSimpleClientset() namer := namer_util.NewNamer(ClusterID, "") ctxConfig := context.ControllerContextConfig{ @@ -75,13 +86,14 @@ func NewTestSyncerManager(kubeClient kubernetes.Interface) *syncerManager { ResyncPeriod: 1 * time.Second, DefaultBackendSvcPort: defaultBackend, } - context := context.NewControllerContext(nil, kubeClient, backendConfigClient, nil, nil, gce.NewFakeGCECloud(gce.DefaultTestClusterValues()), namer, "" /*kubeSystemUID*/, ctxConfig) + context := context.NewControllerContext(nil, kubeClient, backendConfigClient, nil, negClient, gce.NewFakeGCECloud(gce.DefaultTestClusterValues()), namer, "" /*kubeSystemUID*/, ctxConfig) manager := newSyncerManager( namer, record.NewFakeRecorder(100), negtypes.NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network"), negtypes.NewFakeZoneGetter(), + negClient, context.PodInformer.GetIndexer(), context.ServiceInformer.GetIndexer(), context.EndpointInformer.GetIndexer(), @@ -678,6 +690,495 @@ func TestFilterCommonPorts(t *testing.T) { } } +func TestNegCRDEnsureAndStopSyncer(t *testing.T) { + t.Parallel() + + negClient := negfake.NewSimpleClientset() + + manager := NewTestSyncerManagerWithNegClient(fake.NewSimpleClientset(), negClient) + namer := manager.namer + + svcName := "n1" + svcNamespace := "ns1" + customNegName := "neg-name" + var svcUID apitypes.UID = "svc-uid" + svc := &v1.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: svcNamespace, + Name: svcName, + }, + } + svc.SetUID(svcUID) + if err := manager.serviceLister.Add(svc); err != nil { + t.Errorf("failed to add sample service to service store: %s", err) + } + + expectedPortInfoMap := negtypes.NewPortInfoMap( + svcNamespace, + svcName, + types.NewSvcPortTupleSet( + negtypes.SvcPortTuple{Port: port1, TargetPort: targetPort1}, + negtypes.SvcPortTuple{Port: port2, TargetPort: targetPort2}), + namer, false, + map[negtypes.SvcPortTuple]string{negtypes.SvcPortTuple{Port: port1, TargetPort: targetPort1}: customNegName}) + + if err := manager.EnsureSyncers(svcNamespace, svcName, expectedPortInfoMap); err != nil { + t.Errorf("failed to ensure syncer %s/%s-%v: %v", svcNamespace, svcName, expectedPortInfoMap, err) + } + + // check if all expected syncers are present + // for key, _ := range tc.expectInternals { + //Expect two syncers + if len(manager.syncerMap) != 2 { + t.Errorf("manager should have two syncers has %d", len(manager.syncerMap)) + } + + for key, syncer := range manager.syncerMap { + if syncer.IsStopped() || syncer.IsShuttingDown() { + t.Errorf("expected syncer %+v to be running.", key) + } + } + // validate portInfo + if len(manager.svcPortMap) != 1 { + t.Errorf("manager should have one service has %d", len(manager.svcPortMap)) + } + + svcKey := serviceKey{namespace: svcNamespace, name: svcName} + if portInfoMap, svcFound := manager.svcPortMap[svcKey]; svcFound { + for key, expectedInfo := range expectedPortInfoMap { + if info, portFound := portInfoMap[key]; portFound { + if info.NegName != expectedInfo.NegName { + t.Errorf("expected NEG name %q, but got %q", expectedInfo.NegName, info.NegName) + } + + if info.PortTuple.Port == port1 && info.NegName != customNegName { + t.Errorf("expected Neg name for port %d, to be %s, but was %s", port1, customNegName, info.NegName) + } + } else { + t.Errorf("expected port %d of service %q to be registered", key.ServicePort, svcKey.Key()) + } + expectedConditionStatus := map[string]core.ConditionStatus{ + negv1beta1.Initialized: core.ConditionUnknown, + negv1beta1.Synced: core.ConditionUnknown, + } + neg, err := negClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(svcKey.namespace).Get(context2.TODO(), expectedInfo.NegName, metav1.GetOptions{}) + if err != nil { + t.Errorf("error getting neg from neg client: %s", err) + } + checkNegCR(t, neg, svcKey, svcUID, expectedInfo, expectedConditionStatus) + } + + } else { + t.Errorf("expect service key %q to be registered", svcKey.Key()) + } + + if err := manager.EnsureSyncers(svcNamespace, svcName, expectedPortInfoMap); err != nil { + t.Errorf("Unexpected error when ensuring syncers a second time: %s", err) + } + + svc2NegName := "different-service-neg" + testNeg := negv1beta1.ServiceNetworkEndpointGroup{ObjectMeta: metav1.ObjectMeta{Name: svc2NegName}} + _, err := negClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(svcKey.namespace).Create(context2.TODO(), &testNeg, metav1.CreateOptions{}) + if err != nil { + t.Errorf("error creating test neg") + } + + // make sure there is no leaking go routine + err = manager.StopSyncer(svcNamespace, svcName) + if err != nil { + t.Errorf("error stopping syncers and deleting neg crs: %s", err) + } + labelSelector := fmt.Sprintf("%s=%s", negtypes.NegCRServiceNameKey, svcName) + negs, err := negClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(svcKey.namespace).List(context2.TODO(), metav1.ListOptions{LabelSelector: labelSelector}) + if err != nil { + t.Errorf("error retrieving negs : %s", err) + } + + if len(negs.Items) != 2 { + t.Errorf("expected to retrieve two negs, retrieved %d", len(negs.Items)) + } + + for _, neg := range negs.Items { + if neg.GetDeletionTimestamp().IsZero() { + t.Errorf("Expected neg cr %s deletion timestamp to be set", neg.Name) + } + } + + neg, err := negClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(svcKey.namespace).Get(context2.TODO(), svc2NegName, metav1.GetOptions{}) + if !neg.GetDeletionTimestamp().IsZero() { + t.Errorf("Expected neg cr %s deletion timestamp not to be set", svc2NegName) + } +} + +func TestNegCRDDuplicateCreations(t *testing.T) { + t.Parallel() + + svc1Name := "svc1" + svc2Name := "svc2" + namespace := "ns1" + customNegName := "neg-name" + + var svc1UID apitypes.UID = "svc-1-uid" + var svc2UID apitypes.UID = "svc-2-uid" + svc1 := &v1.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: svc1Name, + }, + } + svc1.SetUID(svc1UID) + + svc2 := svc1.DeepCopy() + svc2.Name = svc2Name + svc2.SetUID(svc2UID) + + svcTuple1 := negtypes.SvcPortTuple{Port: port1, TargetPort: targetPort1} + svcTuple2 := negtypes.SvcPortTuple{Port: port2, TargetPort: targetPort1} + + testCases := []struct { + desc string + svc *v1.Service + svcTuple negtypes.SvcPortTuple + markedForDeletion bool + expectDeletion bool + ensureOriginalSvc bool + expectErr bool + }{ + {desc: "same service, same port configuration, original cr is not marked for deletion", + svc: svc1, + svcTuple: svcTuple1, + markedForDeletion: false, + expectDeletion: false, + ensureOriginalSvc: false, + expectErr: false, + }, + {desc: "same service, same port configuration, original cr is marked for deletion", + svc: svc1, + svcTuple: svcTuple1, + markedForDeletion: true, + expectDeletion: false, + ensureOriginalSvc: false, + expectErr: false, + }, + {desc: "same service, different port configuration, original cr is not marked for deletion", + svc: svc1, + svcTuple: svcTuple2, + markedForDeletion: false, + expectDeletion: true, + ensureOriginalSvc: false, + expectErr: false, + }, + {desc: "same service, different port configuration, original cr is marked for deletion", + svc: svc1, + svcTuple: svcTuple2, + markedForDeletion: true, + expectDeletion: false, + ensureOriginalSvc: false, + expectErr: false, + }, + {desc: "different service name, original service is still desired", + svc: svc2, + svcTuple: svcTuple1, + markedForDeletion: false, + expectDeletion: false, + ensureOriginalSvc: true, + expectErr: true, + }, + {desc: "different service name", + svc: svc2, + svcTuple: svcTuple1, + markedForDeletion: false, + expectDeletion: true, + ensureOriginalSvc: false, + expectErr: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + + negClient := negfake.NewSimpleClientset() + + manager := NewTestSyncerManagerWithNegClient(fake.NewSimpleClientset(), negClient) + namer := manager.namer + svcTupleSet := types.NewSvcPortTupleSet(svcTuple1) + portInfoMap := negtypes.NewPortInfoMap( + namespace1, + svc1Name, + svcTupleSet, + namer, false, + map[negtypes.SvcPortTuple]string{svcTuple1: customNegName}, + ) + + svcKey := serviceKey{namespace: namespace, name: svc1Name} + originalPortInfo := portInfoMap[negtypes.PortInfoMapKey{ServicePort: svcTuple1.Port, Subset: ""}] + + var expectedConditionStatus map[string]core.ConditionStatus + + if tc.ensureOriginalSvc { + if err := manager.serviceLister.Add(svc1); err != nil { + t.Errorf("failed to add sample service to service store: %s", err) + } + if err := manager.EnsureSyncers(namespace, svc1Name, portInfoMap); err != nil { + t.Errorf("failed to ensure service %s", err) + } + expectedConditionStatus = map[string]core.ConditionStatus{ + negv1beta1.Initialized: core.ConditionUnknown, + negv1beta1.Synced: core.ConditionUnknown, + } + + } else { + + testNeg := createNegCR(svc1, svcKey, originalPortInfo, core.ConditionTrue, core.ConditionUnknown) + if tc.markedForDeletion { + deletionTS := metav1.Now() + testNeg.SetDeletionTimestamp(&deletionTS) + } + _, err := negClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(namespace1).Create(context2.TODO(), &testNeg, metav1.CreateOptions{}) + if err != nil { + t.Errorf("error creating test neg") + } + expectedConditionStatus = map[string]core.ConditionStatus{ + negv1beta1.Initialized: core.ConditionTrue, + negv1beta1.Synced: core.ConditionUnknown, + } + } + + if err := manager.serviceLister.Add(tc.svc); err != nil { + t.Errorf("failed to add sample service to service store: %s", err) + } + + portInfoMap = negtypes.NewPortInfoMap( + tc.svc.Namespace, + tc.svc.Name, + types.NewSvcPortTupleSet(tc.svcTuple), + namer, false, + map[negtypes.SvcPortTuple]string{tc.svcTuple: customNegName}, + ) + + err := manager.EnsureSyncers(tc.svc.Namespace, tc.svc.Name, portInfoMap) + if tc.expectErr && err == nil { + t.Errorf("expected error when ensuring syncer %s/%s %+v", tc.svc.Namespace, tc.svc.Name, portInfoMap) + } else if !tc.expectErr && err != nil { + t.Errorf("failed to ensure syncer %s/%s %+v: %s", tc.svc.Namespace, tc.svc.Name, portInfoMap, err) + } + + negs, err := negClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(namespace).List(context2.TODO(), metav1.ListOptions{}) + if len(negs.Items) != 1 { + t.Errorf("expected to retrieve two negs, retrieved %d", len(negs.Items)) + } + + // Expect that the NEG CR is not updated + neg, err := negClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(namespace).Get(context2.TODO(), customNegName, metav1.GetOptions{}) + if err != nil { + t.Errorf("error getting neg from neg client: %s", err) + } + checkNegCR(t, neg, svcKey, svc1.UID, originalPortInfo, expectedConditionStatus) + + if (tc.expectDeletion || tc.markedForDeletion) && neg.GetDeletionTimestamp().IsZero() { + t.Errorf("expected neg %s/%s/%s to be marked for deletion ", svc1Name, namespace1, customNegName) + } + + negs, err = negClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(namespace2).List(context2.TODO(), metav1.ListOptions{}) + if err != nil { + t.Errorf("error retrieving negs : %s", err) + } + + if len(negs.Items) != 0 { + t.Errorf("expected to retrieve no negs, retrieved %d for namespace %s", len(negs.Items), namespace2) + } + + // make sure there is no leaking go routine + err = manager.StopSyncer(namespace1, svc1Name) + if err != nil { + t.Errorf("error stopping syncers and deleting neg crs: %s", err) + } + err = manager.StopSyncer(namespace2, svc2Name) + if err != nil { + t.Errorf("error stopping syncers and deleting neg crs: %s", err) + } + err = manager.StopSyncer(namespace2, svc2Name) + if err != nil { + t.Errorf("error stopping syncers and deleting neg crs: %s", err) + } + + }) + } +} + +func TestNegCRDDuplicateDeletions(t *testing.T) { + t.Parallel() + + negClient := negfake.NewSimpleClientset() + + manager := NewTestSyncerManagerWithNegClient(fake.NewSimpleClientset(), negClient) + namer := manager.namer + + svcName := "n1" + svcNamespace := "ns1" + customNegName := "neg-name" + svcKey := serviceKey{namespace: svcNamespace, name: svcName} + var svcUID apitypes.UID = "svc-uid" + svc := &v1.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: svcNamespace, + Name: svcName, + }, + } + svc.SetUID(svcUID) + if err := manager.serviceLister.Add(svc); err != nil { + t.Errorf("failed to add sample service to service store: %s", err) + } + + expectedPortInfoMap := negtypes.NewPortInfoMap( + svcNamespace, + svcName, + types.NewSvcPortTupleSet( + negtypes.SvcPortTuple{Port: port1, TargetPort: targetPort1}, + negtypes.SvcPortTuple{Port: port2, TargetPort: targetPort2}), + namer, false, + map[negtypes.SvcPortTuple]string{negtypes.SvcPortTuple{Port: port1, TargetPort: targetPort1}: customNegName}, + ) + + if err := manager.EnsureSyncers(svcNamespace, svcName, expectedPortInfoMap); err != nil { + t.Errorf("failed to ensure syncer %s/%s-%v: %v", svcNamespace, svcName, expectedPortInfoMap, err) + } + + labelSelector := fmt.Sprintf("%s=%s", negtypes.NegCRServiceNameKey, svcName) + negs, err := negClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(svcKey.namespace).List(context2.TODO(), metav1.ListOptions{LabelSelector: labelSelector}) + if err != nil { + t.Errorf("error retrieving negs : %s", err) + } + + if len(negs.Items) != 2 { + t.Errorf("expected to retrieve two negs, retrieved %d", len(negs.Items)) + } + + deletionTS := metav1.Now() + neg1 := negs.Items[0] + neg2 := negs.Items[1] + neg1.SetDeletionTimestamp(&deletionTS) + _, err = negClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(svcKey.namespace).Update(context2.TODO(), &neg1, metav1.UpdateOptions{}) + if err != nil { + t.Errorf("error setting deletion ts for neg crd: %s", err) + } + err = negClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(svcKey.namespace).Delete(context2.TODO(), neg2.Name, metav1.DeleteOptions{}) + if err != nil { + t.Errorf("error deleting neg crd: %s", err) + } + + // make sure deletion does not try to re delete negs. + err = manager.StopSyncer(svcNamespace, svcName) + if err != nil { + t.Errorf("error stopping syncers and deleting neg crs: %s", err) + } + negs, err = negClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(svcKey.namespace).List(context2.TODO(), metav1.ListOptions{LabelSelector: labelSelector}) + if err != nil { + t.Errorf("error retrieving negs : %s", err) + } + + if len(negs.Items) != 1 { + t.Errorf("expected to retrieve one neg, retrieved %d", len(negs.Items)) + } + + if !negs.Items[0].GetDeletionTimestamp().Equal(&deletionTS) { + t.Errorf("Expected neg cr %s deletion timestamp to be unchanged", negs.Items[0].Name) + } +} + +// Check that NEG CR Conditions exist and are in the expected condition +func checkNegCR(t *testing.T, neg *negv1beta1.ServiceNetworkEndpointGroup, svcKey serviceKey, svcUID apitypes.UID, expectedInfo negtypes.PortInfo, expectedConditionStatus map[string]core.ConditionStatus) { + + if neg.GetNamespace() != svcKey.namespace { + t.Errorf("neg namespace is %s, expected %s", neg.GetNamespace(), svcKey.namespace) + } + + // TODO Add check for finalizer after Neg CRD Garbage Collection is implemented. + + //check labels + labels := neg.GetLabels() + if len(labels) != 3 { + t.Errorf("Expected 3 labels for neg %s, found %d", neg.Name, len(labels)) + } else { + + if val, ok := labels[negtypes.NegCRManagedByKey]; !ok || val != negtypes.NegCRControllerValue { + t.Errorf("Expected neg to have label %s, with value %s found %s", negtypes.NegCRManagedByKey, negtypes.NegCRControllerValue, val) + } + + if val, ok := labels[negtypes.NegCRServiceNameKey]; !ok || val != svcKey.name { + t.Errorf("Expected neg to have label %s, with value %s found %s", negtypes.NegCRServiceNameKey, expectedInfo.NegName, val) + } + + if val, ok := labels[negtypes.NegCRServicePortKey]; !ok || val != fmt.Sprint(expectedInfo.PortTuple.Port) { + t.Errorf("Expected neg to have label %s, with value %d found %s", negtypes.NegCRServicePortKey, expectedInfo.PortTuple.Port, val) + } + } + if len(neg.Status.Conditions) != 2 { + t.Errorf("Expected neg to have 2 conditions, found %d", len(neg.Status.Conditions)) + } else { + + expectedConditions := sets.NewString(string(negv1beta1.Initialized), string(negv1beta1.Synced)) + foundConditions := sets.NewString() + for _, condition := range neg.Status.Conditions { + foundConditions.Insert(string(condition.Type)) + expectedStatus, ok := expectedConditionStatus[condition.Type] + if !ok { + t.Errorf("Unexpected neg condition type %s found", condition.Type) + continue + } + if condition.Status != expectedStatus { + t.Errorf("Expected neg condition %s, to be set to state %s not %s", condition.Type, expectedStatus, condition.Status) + } + + if condition.LastTransitionTime.IsZero() { + t.Errorf("Expected neg condition %s to have last transition time set", condition.Type) + } + + if condition.Reason == "" { + t.Errorf("Expected neg condition %s to have a non empty reason", condition.Type) + } + } + + if !expectedConditions.Equal(foundConditions) { + t.Errorf("Expected neg conditions to be %+v, found %+v", expectedConditions.List(), foundConditions.List()) + } + + if !neg.Status.LastSyncTime.IsZero() { + t.Errorf("Expected neg condition sync time not to be set yet") + } + } + + ownerRefs := neg.GetOwnerReferences() + if len(ownerRefs) != 1 { + t.Errorf("Expected neg to have one owner ref, has %d", len(ownerRefs)) + } else { + + if ownerRefs[0].Name != svcKey.name { + t.Errorf("Expected neg owner ref to have name %s, instead has %s", svcKey.name, ownerRefs[0].Name) + } + + if ownerRefs[0].UID != svcUID { + t.Errorf("Expected neg owner ref to have UID %s, instead has %s", svcUID, ownerRefs[0].UID) + } + + if *ownerRefs[0].BlockOwnerDeletion != false { + t.Errorf("Expected neg owner ref not block owner deltion") + } + } +} + // populateSyncerManager for testing func populateSyncerManager(manager *syncerManager, kubeClient kubernetes.Interface) { namer := manager.namer @@ -822,3 +1323,40 @@ func portInfoUnion(p1, p2 negtypes.PortInfoMap) negtypes.PortInfoMap { p1.Merge(p2) return p1 } + +func createNegCR(service *v1.Service, svcKey serviceKey, portInfo negtypes.PortInfo, initializedConditionStatus, syncedConditionStatus core.ConditionStatus) negv1beta1.ServiceNetworkEndpointGroup { + now := metav1.Now() + ownerReference := metav1.NewControllerRef(service, service.GroupVersionKind()) + *ownerReference.BlockOwnerDeletion = false + labels := map[string]string{ + negtypes.NegCRManagedByKey: negtypes.NegCRControllerValue, + negtypes.NegCRServiceNameKey: svcKey.name, + negtypes.NegCRServicePortKey: fmt.Sprint(portInfo.PortTuple.Port), + } + + // TODO: Add finalizer once NEG CRD GC is implemented + return negv1beta1.ServiceNetworkEndpointGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: portInfo.NegName, + Namespace: svcKey.namespace, + OwnerReferences: []metav1.OwnerReference{*ownerReference}, + Labels: labels, + }, + Status: negv1beta1.ServiceNetworkEndpointGroupStatus{ + Conditions: []negv1beta1.Condition{ + negv1beta1.Condition{ + Type: negv1beta1.Initialized, + Status: initializedConditionStatus, + LastTransitionTime: now, + Reason: "condition-transition-reason", + }, + negv1beta1.Condition{ + Type: negv1beta1.Synced, + Status: syncedConditionStatus, + LastTransitionTime: now, + Reason: "condition-transition-reason", + }, + }, + }, + } +} diff --git a/pkg/neg/types/interfaces.go b/pkg/neg/types/interfaces.go index 452354be83..fb8eaf43ba 100644 --- a/pkg/neg/types/interfaces.go +++ b/pkg/neg/types/interfaces.go @@ -18,7 +18,7 @@ package types import ( "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/ingress-gce/pkg/composite" ) @@ -70,7 +70,7 @@ type NegSyncerManager interface { // portMap is a map of ServicePort Port to TargetPort EnsureSyncers(namespace, name string, portMap PortInfoMap) error // StopSyncer stops all syncers related to the service. This call is asynchronous. It will not wait for all syncers to stop. - StopSyncer(namespace, name string) + StopSyncer(namespace, name string) error // Sync signals all syncers related to the service to sync. This call is asynchronous. Sync(namespace, name string) // SyncNodes signals all syncers watching nodes to sync. This call is asynchronous. diff --git a/pkg/neg/types/types.go b/pkg/neg/types/types.go index c71c7ffb7a..b1efdbb5a7 100644 --- a/pkg/neg/types/types.go +++ b/pkg/neg/types/types.go @@ -41,6 +41,14 @@ const ( L7Mode = EndpointsCalculatorMode("L7") L4LocalMode = EndpointsCalculatorMode("L4, ExternalTrafficPolicy:Local") L4ClusterMode = EndpointsCalculatorMode("L4, ExternalTrafficPolicy:Cluster") + + // These keys are to be used as label keys for NEG CRs when enabled + + NegCRManagedByKey = "networking.gke.io/managed-by" + NegCRServiceNameKey = "networking.gke.io/service-name" + NegCRServicePortKey = "networking.gke.io/service-port" + // NegCRControllerValue is used as the value for the managed-by label on NEG CRs when enabled. + NegCRControllerValue = "neg-controller" ) // SvcPortTuple is the tuple representing one service port