diff --git a/cmd/e2e-test/asm_test.go b/cmd/e2e-test/asm_test.go new file mode 100644 index 0000000000..21fd55a2c3 --- /dev/null +++ b/cmd/e2e-test/asm_test.go @@ -0,0 +1,233 @@ +package main + +import ( + "context" + "fmt" + "strconv" + "strings" + "testing" + "time" + + istioV1alpha3 "istio.io/api/networking/v1alpha3" + apiv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/ingress-gce/pkg/e2e" + "k8s.io/klog" +) + +const ( + asmConfigNamespace = "kube-system" + asmConfigName = "ingress-controller-asm-cm-config" + negControllerRestartTimeout = 5 * time.Minute +) + +// TestASMConfig tests the ASM enable/disable, it can't run parallel with other tests. +func TestASMConfig(t *testing.T) { + Framework.RunWithSandbox("TestASMConfig", t, func(t *testing.T, s *e2e.Sandbox) { + for _, tc := range []struct { + desc string + configMap map[string]string + wantConfigMapEvents []string + }{ + { + desc: "Invalid ConfigMap value equals to disable", + configMap: map[string]string{"enable-asm": "INVALID"}, + wantConfigMapEvents: []string{"The map provided a unvalid value for field: enable-asm, value: INVALID"}, + }, + { + desc: "Invalid ConfigMap filed equals to disable", + configMap: map[string]string{"enable-unknow-feild": "INVALID1"}, + wantConfigMapEvents: []string{"The map contains a unknown key-value pair: enable-unknow-feild:INVALID1"}, + }, + { + desc: "Set enable-asm to true should restart the controller", + configMap: map[string]string{"enable-asm": "true"}, + wantConfigMapEvents: []string{"ConfigMapConfigController: Get a update on the ConfigMapConfig, Restarting Ingress controller"}, + }, + // TODO(koonwah): The below case is not fully tested, should update the neg controller to include a enable-asm event to the target CM. + { + desc: "Invalid ConfigMap value equals to disable", + configMap: map[string]string{"enable-asm": "INVALID2"}, + wantConfigMapEvents: []string{"The map provided a unvalid value for field: enable-asm, value: INVALID2", + "ConfigMapConfigController: Get a update on the ConfigMapConfig, Restarting Ingress controller"}, + }, + } { + t.Logf("Running test case: %s", tc.desc) + if err := e2e.EnsureConfigMap(s, asmConfigNamespace, asmConfigName, tc.configMap); err != nil { + t.Errorf("Failed to ensure ConfigMap, error: %s", err) + } + if err := e2e.WaitConfigMapEvents(s, asmConfigNamespace, asmConfigName, tc.wantConfigMapEvents, negControllerRestartTimeout); err != nil { + t.Fatalf("Failed to get events: %v; Error %e", strings.Join(tc.wantConfigMapEvents, ";"), err) + } + } + }) +} + +// TestASMServiceAndDestinationRule tests the service/destinationrule, it can't run parallel with other tests. +func TestASMServiceAndDestinationRule(t *testing.T) { + _ = istioV1alpha3.DestinationRule{} + _ = apiv1.ComponentStatus{} + + // This test case will need two namespaces, one will in asm-skip-namespaces. + Framework.RunWithSandbox("TestASMServiceAndDestinationRule", t, func(t *testing.T, sSkip *e2e.Sandbox) { + Framework.RunWithSandbox("TestASMServiceAndDestinationRule", t, func(t *testing.T, s *e2e.Sandbox) { + // Enable ASM mode + ctx := context.Background() + + asmConfig := map[string]string{"enable-asm": "true", + "asm-skip-namespaces": fmt.Sprintf("kube-system,istio-system,%s", sSkip.Namespace)} + if err := e2e.EnsureConfigMap(s, asmConfigNamespace, asmConfigName, + asmConfig); err != nil { + t.Error(err) + } + + var porterPort int32 + porterPort = 80 + svcName := "service" + svcSkipName := "service-skip" + + // Create deployments used by the Service and DestinationRules. + // The deployment will contain two label pairs: {"app": "porter", "version": "*"} + // Different versions will be used as DestinationRule: subset + for _, deployment := range []struct { + deploymentName string + replics int32 + version string + }{ + {deploymentName: "deployment-v1", replics: 1, version: "v1"}, + {deploymentName: "deployment-v2", replics: 2, version: "v2"}, + {deploymentName: "deployment-v3", replics: 3, version: "v3"}, + } { + if err := e2e.CreatePorterDeployment(s, deployment.deploymentName, deployment.replics, deployment.version); err != nil { + t.Errorf("Failed to create deployment, Error: %s", err) + } + } + + // Create and validate DestinationRules level NEGs, NEG controller shouldn't create DestinationRules level NEGs for those services in the skip namespace. + for _, svc := range []struct { + desc string + svcName string + inSkipNamespace bool + }{ + {desc: "NEG Controller should create NEGs for all ports for a service by default", svcName: svcName, inSkipNamespace: false}, + {desc: "NEG Controller shouldn't create NEGs for all ports for a service if it's in a skip namespace", svcName: svcSkipName, inSkipNamespace: true}, + } { + t.Logf("Running test case: %s", svc.desc) + sandbox := s + noPresentTest := false + if svc.inSkipNamespace { + sandbox = sSkip + noPresentTest = true + } + if err := e2e.CreatePorterService(sandbox, svc.svcName); err != nil { + t.Errorf("Failed to create service, Error: %s", err) + } + + // Test the Service Annotations + negStatus, err := e2e.WaitForNegStatus(sandbox, svc.svcName, []string{strconv.Itoa(int(porterPort))}, noPresentTest) + if err != nil { + t.Errorf("Failed to wait for Service NEGAnnotation, error: %s", err) + } + if svc.inSkipNamespace { + if negStatus != nil { + t.Errorf("Service: %s/%s is in the ASM skip namespace, shoudln't have NEG Status. ASM Config: %v, NEGStatus got: %v", + sandbox.Namespace, svc.svcName, asmConfig, negStatus) + } + } else { + if negName, ok := negStatus.NetworkEndpointGroups[strconv.Itoa(int(porterPort))]; ok { + // No backend pod exists, so the NEG has 0 endpoint. + if err := e2e.WaitForNegs(ctx, Framework.Cloud, negName, negStatus.Zones, false, 0); err != nil { + t.Errorf("Failed to wait Negs, error: %s", err) + } + } else { + t.Fatalf("Service annotation doesn't contain the desired NEG status, want: %d, have: %v", porterPort, negStatus.NetworkEndpointGroups) + } + } + } + + for _, tc := range []struct { + desc string + destinationRuleName string + subsetEndpointCountMap map[string]int + crossNamespace bool // If crossNamespace set, the DestinationRule will use full Host(Service) name to refer a service in a different namespace. + }{ + {desc: "NEG controller should create NEGs for destinationrule", destinationRuleName: "porter-destinationrule", subsetEndpointCountMap: map[string]int{"v1": 1, "v2": 2, "v3": 3}, crossNamespace: false}, + {desc: "NEG controller should update NEGs for destinationrule", destinationRuleName: "porter-destinationrule", subsetEndpointCountMap: map[string]int{"v1": 1, "v2": 2}, crossNamespace: false}, + {desc: "NEG controller should create NEGs for cross namespace destinationrule", destinationRuleName: "porter-destinationrule-1", subsetEndpointCountMap: map[string]int{"v1": 1}, crossNamespace: true}, + } { + t.Logf("Running test case: %s", tc.desc) + sandbox := s + drHost := svcName + // crossNamespace will test DestinationRules that refering a serive located in a different namespace + if tc.crossNamespace { + sandbox = sSkip + drHost = fmt.Sprintf("%s.%s.svc.cluster.local", svcName, s.Namespace) + } + + versions := []string{} + for k := range tc.subsetEndpointCountMap { + versions = append(versions, k) + } + if err := e2e.EnsurePorterDestinationRule(sandbox, tc.destinationRuleName, drHost, versions); err != nil { + klog.Errorf("Failed to create destinationRule, error: %s", err) + } + + // One DestinationRule should have count(NEGs) = count(subset)* count(port) + dsNEGStatus, err := e2e.WaitDestinationRuleAnnotation(sandbox, sandbox.Namespace, tc.destinationRuleName, len(tc.subsetEndpointCountMap)*1, 5*time.Minute) + if err != nil { + klog.Errorf("Failed to validate the NEG count. Error: %s", err) + } + + zones := dsNEGStatus.Zones + for subsetVersion, endpointCount := range tc.subsetEndpointCountMap { + negNames, ok := dsNEGStatus.NetworkEndpointGroups[subsetVersion] + if !ok { + t.Fatalf("DestinationRule annotation doesn't contain the desired NEG status, want: %s, have: %v", subsetVersion, dsNEGStatus.NetworkEndpointGroups) + } + negName, ok := negNames[strconv.Itoa(int(porterPort))] + if !ok { + t.Fatalf("DestinationRule annotation doesn't contain the desired NEG status, want: %d, have: %v", porterPort, negNames) + } + if err := e2e.WaitForNegs(ctx, Framework.Cloud, negName, zones, false, endpointCount); err != nil { + t.Errorf("Failed to wait Negs, error: %s", err) + } + } + + } + + }) + }) +} + +func TestNoIstioASM(t *testing.T) { + + Framework.RunWithSandbox("TestASMConfigOnNoIstioCluster", t, func(t *testing.T, s *e2e.Sandbox) { + + cm := map[string]string{"enable-asm": "true"} + wantConfigMapEvents := []string{"ConfigMapConfigController: Get a update on the ConfigMapConfig, Restarting Ingress controller", + "Cannot find DestinationRule CRD, disabling ASM Mode, please check Istio setup."} + + if err := e2e.EnsureConfigMap(s, asmConfigNamespace, asmConfigName, cm); err != nil { + t.Error(err) + } + defer e2e.DeleteConfigMap(s, asmConfigNamespace, asmConfigName) + if err := e2e.WaitConfigMapEvents(s, asmConfigNamespace, asmConfigName, wantConfigMapEvents, negControllerRestartTimeout); err != nil { + t.Fatalf("Failed to get events: %v; Error %e", wantConfigMapEvents, err) + } + + if err := wait.Poll(5*time.Second, 1*time.Minute, func() (bool, error) { + cmData, err := e2e.GetConfigMap(s, asmConfigNamespace, asmConfigName) + if err != nil { + return false, err + } + if val, ok := cmData["enable-asm"]; ok && val == "false" { + return true, nil + } + return false, fmt.Errorf("ConfigMap: %s/%s, nable-asm is not false. Value: %v", asmConfigNamespace, asmConfigName, cmData) + + }); err != nil { + t.Fatalf("Failed to validate enable-asm = false. Error: %s", err) + } + + }) +} diff --git a/cmd/e2e-test/main_test.go b/cmd/e2e-test/main_test.go index 92e217c102..08f1e2477f 100644 --- a/cmd/e2e-test/main_test.go +++ b/cmd/e2e-test/main_test.go @@ -49,6 +49,7 @@ var ( handleSIGINT bool gceEndpointOverride string createILBSubnet bool + enableIstio bool } Framework *e2e.Framework @@ -71,6 +72,7 @@ func init() { flag.BoolVar(&flags.handleSIGINT, "handleSIGINT", true, "catch SIGINT to perform clean") flag.StringVar(&flags.gceEndpointOverride, "gce-endpoint-override", "", "If set, talks to a different GCE API Endpoint. By default it talks to https://www.googleapis.com/compute/v1/") flag.BoolVar(&flags.createILBSubnet, "createILBSubnet", false, "If set, creates a proxy subnet for the L7 ILB") + flag.BoolVar(&flags.enableIstio, "enable-istio", false, "set to true if Istio is enabled.") } // TestMain is the entrypoint for the end-to-end test suite. This is where @@ -126,6 +128,7 @@ func TestMain(m *testing.M) { DestroySandboxes: flags.destroySandboxes, GceEndpointOverride: flags.gceEndpointOverride, CreateILBSubnet: flags.createILBSubnet, + EnableIstio: flags.enableIstio, }) if flags.handleSIGINT { Framework.CatchSIGINT() diff --git a/cmd/e2e-test/neg_test.go b/cmd/e2e-test/neg_test.go index 4ee4f75b6f..902f3eb978 100644 --- a/cmd/e2e-test/neg_test.go +++ b/cmd/e2e-test/neg_test.go @@ -21,7 +21,7 @@ import ( "testing" apps "k8s.io/api/apps/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/api/networking/v1beta1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" @@ -338,7 +338,7 @@ func TestNEGSyncEndpoints(t *testing.T) { } // validate neg status - negStatus, err := e2e.WaitForNegStatus(s, svcName, tc.expectServicePort.List()) + negStatus, err := e2e.WaitForNegStatus(s, svcName, tc.expectServicePort.List(), false) if err != nil { t.Fatalf("error waiting for NEG status to update: %v", err) } diff --git a/cmd/e2e-test/upgrade/neg.go b/cmd/e2e-test/upgrade/neg.go index e60e496d5b..c6cfd49a98 100644 --- a/cmd/e2e-test/upgrade/neg.go +++ b/cmd/e2e-test/upgrade/neg.go @@ -18,10 +18,11 @@ package upgrade import ( "context" - "k8s.io/api/core/v1" + "testing" + + v1 "k8s.io/api/core/v1" "k8s.io/ingress-gce/pkg/annotations" "k8s.io/ingress-gce/pkg/e2e" - "testing" ) var ( @@ -105,7 +106,7 @@ func (sn *StandaloneNeg) scaleAndValidate(replicas int32) { // validate check if the NEG status annotation and the corresponding NEGs are correctly configured. func (sn *StandaloneNeg) validate(replicas int32) { // validate neg status - negStatus, err := e2e.WaitForNegStatus(sn.s, svcName, expectServicePort) + negStatus, err := e2e.WaitForNegStatus(sn.s, svcName, expectServicePort, false) if err != nil { sn.t.Fatalf("error waiting for NEG status to update: %v", err) } diff --git a/pkg/e2e/fixtures.go b/pkg/e2e/fixtures.go index b49cbf2654..50b3730f35 100644 --- a/pkg/e2e/fixtures.go +++ b/pkg/e2e/fixtures.go @@ -21,17 +21,24 @@ package e2e import ( "context" "fmt" - "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" - "k8s.io/ingress-gce/pkg/utils" "math/rand" "net/http" "reflect" + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" + istioV1alpha3 "istio.io/api/networking/v1alpha3" + apiappsv1 "k8s.io/api/apps/v1" + apiv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/ingress-gce/pkg/utils" + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" computebeta "google.golang.org/api/compute/v0.beta" "google.golang.org/api/compute/v1" apps "k8s.io/api/apps/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/api/networking/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -42,6 +49,7 @@ import ( const ( echoheadersImage = "gcr.io/k8s-ingress-image-push/ingress-gce-echo-amd64:master" + porterPort = 80 ) var ErrSubnetExists = fmt.Errorf("ILB subnet in region already exists") @@ -320,3 +328,109 @@ func DeleteILBSubnet(s *Sandbox, name string) error { klog.V(2).Infof("Deleting ILB Subnet %q", name) return s.f.Cloud.BetaSubnetworks().Delete(context.Background(), meta.RegionalKey(name, s.f.Region)) } + +// CreatePorterDeployment creates a Deployment with porter image. +func CreatePorterDeployment(s *Sandbox, name string, replics int32, version string) error { + env := fmt.Sprintf("SERVE_PORT_%d", porterPort) + labels := map[string]string{"app": "porter", "version": version} + deployment := apiappsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{Namespace: s.Namespace, Name: name}, + Spec: apiappsv1.DeploymentSpec{ + Replicas: &replics, + Selector: &metav1.LabelSelector{MatchLabels: labels}, + Template: apiv1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{Labels: labels}, + Spec: apiv1.PodSpec{ + Containers: []apiv1.Container{ + { + Name: "hostname", + Image: "gcr.io/kubernetes-e2e-test-images/porter-alpine:1.0", + Env: []apiv1.EnvVar{{Name: env, Value: env}}, + Ports: []apiv1.ContainerPort{{Name: "server", ContainerPort: porterPort}}, + }, + }, + }, + }, + }, + } + _, err := s.f.Clientset.AppsV1().Deployments(s.Namespace).Create(&deployment) + return err +} + +// CreatePorterService creates a service that refers to Porter pods. +func CreatePorterService(s *Sandbox, name string) error { + svc := apiv1.Service{ + ObjectMeta: metav1.ObjectMeta{Namespace: s.Namespace, Name: name}, + Spec: apiv1.ServiceSpec{ + Selector: map[string]string{"app": "porter"}, + Ports: []apiv1.ServicePort{ + { + Port: porterPort, + Name: "http", + }, + }, + }, + } + _, err := s.f.Clientset.CoreV1().Services(svc.Namespace).Create(&svc) + return err +} + +// GetConfigMap gets ConfigMap and returns the Data field. +func GetConfigMap(s *Sandbox, namespace, name string) (map[string]string, error) { + cm, err := s.f.Clientset.CoreV1().ConfigMaps(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + return cm.Data, nil +} + +// EnsureConfigMap ensures the namespace:name ConfigMap Data fieled, create if the target not exist. +func EnsureConfigMap(s *Sandbox, namespace, name string, data map[string]string) error { + cm := v1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: name}, Data: data} + _, err := s.f.Clientset.CoreV1().ConfigMaps(namespace).Update(&cm) + if err != nil && errors.IsNotFound(err) { + _, err = s.f.Clientset.CoreV1().ConfigMaps(namespace).Create(&cm) + } + return err +} + +// DeleteConfigMap deletes the namespace:name ConfigMap +func DeleteConfigMap(s *Sandbox, namespace, name string) error { + return s.f.Clientset.CoreV1().ConfigMaps(namespace).Delete(name, &metav1.DeleteOptions{}) +} + +// EnsurePorterDestinationRule ensures the namespace:name DestinationRule. +func EnsurePorterDestinationRule(s *Sandbox, name, svcName string, versions []string) error { + destinationRule := istioV1alpha3.DestinationRule{} + subset := []*istioV1alpha3.Subset{} + for _, v := range versions { + subset = append(subset, &istioV1alpha3.Subset{Name: v, Labels: map[string]string{"version": v}}) + } + destinationRule.Subsets = subset + destinationRule.Host = svcName + spec, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&destinationRule) + if err != nil { + return fmt.Errorf("Failed convert DestinationRule to Unstructured: %v", err) + } + + usDr, err := s.f.DestinationRuleClient.Namespace(s.Namespace).Get(name, metav1.GetOptions{}) + if err != nil && errors.IsNotFound(err) { + usDr := unstructured.Unstructured{} + usDr.SetName(name) + usDr.SetNamespace(s.Namespace) + usDr.SetKind("DestinationRule") + usDr.SetAPIVersion("networking.istio.io/v1alpha3") + usDr.Object["spec"] = spec + + _, err = s.f.DestinationRuleClient.Namespace(s.Namespace).Create(&usDr, metav1.CreateOptions{}) + return err + } + usDr.Object["spec"] = spec + _, err = s.f.DestinationRuleClient.Namespace(s.Namespace).Update(usDr, metav1.UpdateOptions{}) + return err +} + +// DeleteDestinationRule deletes the namespace:name DestinationRule. +func DeleteDestinationRule(s *Sandbox, namespace, name string) error { + return s.f.DestinationRuleClient.Namespace(namespace).Delete(name, &metav1.DeleteOptions{}) +} diff --git a/pkg/e2e/framework.go b/pkg/e2e/framework.go index d9caa47845..417bbed332 100644 --- a/pkg/e2e/framework.go +++ b/pkg/e2e/framework.go @@ -33,6 +33,8 @@ import ( computebeta "google.golang.org/api/compute/v0.beta" compute "google.golang.org/api/compute/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" backendconfigclient "k8s.io/ingress-gce/pkg/backendconfig/client/clientset/versioned" @@ -48,8 +50,17 @@ type Options struct { DestroySandboxes bool GceEndpointOverride string CreateILBSubnet bool + EnableIstio bool } +const ( + destinationRuleGroup = "networking.istio.io" + destinationRuleAPIVersion = "v1alpha3" + destinationRulePlural = "destinationrules" + // This must match the spec fields below, and be in the form: . + destinationRuleCRDName = "destinationrules.networking.istio.io" +) + // NewFramework returns a new test framework to run. func NewFramework(config *rest.Config, options Options) *Framework { theCloud, err := NewCloud(options.Project, options.GceEndpointOverride) @@ -60,6 +71,7 @@ func NewFramework(config *rest.Config, options Options) *Framework { if err != nil { klog.Fatalf("Failed to create BackendConfig client: %v", err) } + f := &Framework{ RestConfig: config, Clientset: kubernetes.NewForConfigOrDie(config), @@ -73,13 +85,24 @@ func NewFramework(config *rest.Config, options Options) *Framework { CreateILBSubnet: options.CreateILBSubnet, } f.statusManager = NewStatusManager(f) + if options.EnableIstio { + dynamicClient, err := dynamic.NewForConfig(config) + if err != nil { + klog.Fatalf("Failed to create Dynamic client: %v", err) + } + destrinationGVR := schema.GroupVersionResource{Group: destinationRuleGroup, Version: destinationRuleAPIVersion, Resource: destinationRulePlural} + f.DestinationRuleClient = dynamicClient.Resource(destrinationGVR) + } + return f } // Framework is the end-to-end test framework. type Framework struct { - RestConfig *rest.Config - Clientset *kubernetes.Clientset + RestConfig *rest.Config + Clientset *kubernetes.Clientset + DestinationRuleClient dynamic.NamespaceableResourceInterface + BackendConfigClient *backendconfigclient.Clientset Project string Region string diff --git a/pkg/e2e/helpers.go b/pkg/e2e/helpers.go index 553059f4c3..2856c24fe0 100644 --- a/pkg/e2e/helpers.go +++ b/pkg/e2e/helpers.go @@ -23,6 +23,7 @@ import ( "io/ioutil" "net" "net/http" + "sort" "strings" "time" @@ -32,6 +33,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/api/networking/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" @@ -68,6 +70,15 @@ type WaitForIngressOptions struct { ExpectUnreachable bool } +// Scheme is the default instance of runtime.Scheme to which types in the Kubernetes API are already registered. +// This is needed for ConfigMap search. +var Scheme = runtime.NewScheme() + +func init() { + // Register external types for Scheme + v1.AddToScheme(Scheme) +} + // IsRfc1918Addr returns true if the address supplied is an RFC1918 address func IsRfc1918Addr(addr string) bool { ip := net.ParseIP(addr) @@ -293,10 +304,15 @@ func WaitForEchoDeploymentStable(s *Sandbox, name string) error { } // WaitForNegStatus waits util the neg status on the service got to expected state. -func WaitForNegStatus(s *Sandbox, name string, expectSvcPorts []string) (annotations.NegStatus, error) { +// if noPresentTest set to true, WaitForNegStatus makes sure no NEG annotation is added until timeout(5 mins). +func WaitForNegStatus(s *Sandbox, name string, expectSvcPorts []string, noPresentTest bool) (*annotations.NegStatus, error) { var ret annotations.NegStatus var err error - err = wait.Poll(negPollInterval, gclbDeletionTimeout, func() (bool, error) { + timeout := gclbDeletionTimeout + if noPresentTest { + timeout = 2 * time.Minute + } + err = wait.Poll(negPollInterval, timeout, func() (bool, error) { svc, err := s.f.Clientset.CoreV1().Services(s.Namespace).Get(name, metav1.GetOptions{}) if svc == nil || err != nil { return false, fmt.Errorf("failed to get service %s/%s: %v", s.Namespace, name, err) @@ -308,7 +324,10 @@ func WaitForNegStatus(s *Sandbox, name string, expectSvcPorts []string) (annotat } return true, nil }) - return ret, err + if noPresentTest && err == wait.ErrWaitTimeout { + return nil, nil + } + return &ret, err } // WaitForNegs waits until the input NEG got into the expect states. @@ -515,3 +534,67 @@ func CheckV2Finalizer(ing *v1beta1.Ingress) error { } return nil } + +// WaitDestinationRuleAnnotation waits until the DestinationRule NEG annotation count equal to negCount. +func WaitDestinationRuleAnnotation(s *Sandbox, namespace, name string, negCount int, timeout time.Duration) (*annotations.DestinationRuleNEGStatus, error) { + var rsl annotations.DestinationRuleNEGStatus + if err := wait.Poll(5*time.Second, timeout, func() (bool, error) { + unsDr, err := s.f.DestinationRuleClient.Namespace(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + return false, err + } + ann := unsDr.GetAnnotations() + klog.Infof("Wait for DestinationRule NEG annotation, want count: %d, got annotation: %v", negCount, ann) + if ann != nil { + if val, ok := ann[annotations.NEGStatusKey]; ok { + rsl, err = annotations.ParseDestinationRuleNEGStatus(val) + if err != nil { + return false, err + } + if len(rsl.NetworkEndpointGroups) == negCount { + return true, nil + } + } + } + return false, nil + }); err != nil { + return nil, err + } + return &rsl, nil +} + +// WaitConfigMapEvents waits the msgs messages present for namespace:name ConfigMap until timeout. +func WaitConfigMapEvents(s *Sandbox, namespace, name string, msgs []string, timeout time.Duration) error { + cm, err := s.f.Clientset.CoreV1().ConfigMaps(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + return err + } + if cm == nil { + return fmt.Errorf("Cannot find ConfigMap: %s/%s", namespace, name) + } + return wait.Poll(5*time.Second, timeout, func() (bool, error) { + eventList, err := s.f.Clientset.CoreV1().Events(namespace).Search(Scheme, cm) + if err != nil { + return false, err + } + if len(eventList.Items) < len(msgs) { + return false, nil + } + allMsg := "" + events := eventList.Items + sort.Slice(events, func(i, j int) bool { + return events[i].LastTimestamp.Before(&events[j].LastTimestamp) + }) + for _, event := range events[len(events)-len(msgs):] { + allMsg += event.Message + } + klog.Infof("WaitDestinationRuleAnnotation, allMsg: %s, want: %v", allMsg, msgs) + + for _, msg := range msgs { + if !strings.Contains(allMsg, msg) { + return false, nil + } + } + return true, nil + }) +}