From 5d24ee2bd170a0c2c636b7d1ebc7ebdd3ecf81ac Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Wed, 8 May 2024 09:35:42 -0700 Subject: [PATCH] xds: store server config for LRS server in xdsresource.ClusterUpdate (#7191) * xds: support LRS server config * switch to the new bootstrap package in internal/xds --- .../balancer/cdsbalancer/cdsbalancer.go | 16 +- xds/internal/xdsclient/authority.go | 11 +- .../xdsclient/clientimpl_authority.go | 2 +- xds/internal/xdsclient/clientimpl_watchers.go | 9 -- .../xdsclient/tests/resource_update_test.go | 20 ++- .../xdsresource/cluster_resource_type.go | 2 +- .../xdsclient/xdsresource/resource_type.go | 8 +- .../xdsresource/tests/unmarshal_cds_test.go | 36 +++-- .../xdsclient/xdsresource/type_cds.go | 21 +-- .../xdsclient/xdsresource/unmarshal_cds.go | 19 +-- .../xdsresource/unmarshal_cds_test.go | 148 ++++++++---------- 11 files changed, 126 insertions(+), 166 deletions(-) diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer.go b/xds/internal/balancer/cdsbalancer/cdsbalancer.go index 0d0e9d2add42..8e97e104ed4b 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer.go @@ -609,21 +609,7 @@ func (b *cdsBalancer) generateDMsForCluster(name string, depth int, dms []cluste Cluster: cluster.ClusterName, EDSServiceName: cluster.EDSServiceName, MaxConcurrentRequests: cluster.MaxRequests, - } - if cluster.LRSServerConfig == xdsresource.ClusterLRSServerSelf { - bootstrapConfig := b.xdsClient.BootstrapConfig() - parsedName := xdsresource.ParseName(cluster.ClusterName) - if parsedName.Scheme == xdsresource.FederationScheme { - // Is a federation resource name, find the corresponding - // authority server config. - if cfg, ok := bootstrapConfig.Authorities[parsedName.Authority]; ok { - dm.LoadReportingServer = cfg.XDSServer - } - } else { - // Not a federation resource name, use the default - // authority. - dm.LoadReportingServer = bootstrapConfig.XDSServer - } + LoadReportingServer: cluster.LRSServerConfig, } case xdsresource.ClusterTypeLogicalDNS: dm = clusterresolver.DiscoveryMechanism{ diff --git a/xds/internal/xdsclient/authority.go b/xds/internal/xdsclient/authority.go index 6c82e70e19e2..62d7a1756e41 100644 --- a/xds/internal/xdsclient/authority.go +++ b/xds/internal/xdsclient/authority.go @@ -94,12 +94,6 @@ type authorityArgs struct { // (although the former is part of the latter) is because authorities in the // bootstrap config might contain an empty server config, and in this case, // the top-level server config is to be used. - // - // There are two code paths from where a new authority struct might be - // created. One is when a watch is registered for a resource, and one is - // when load reporting needs to be started. We have the authority name in - // the first case, but do in the second. We only have the server config in - // the second case. serverCfg *bootstrap.ServerConfig bootstrapCfg *bootstrap.Config serializer *grpcsync.CallbackSerializer @@ -156,7 +150,10 @@ func (a *authority) handleResourceUpdate(resourceUpdate transport.ResourceUpdate return xdsresource.NewErrorf(xdsresource.ErrorTypeResourceTypeUnsupported, "Resource URL %v unknown in response from server", resourceUpdate.URL) } - opts := &xdsresource.DecodeOptions{BootstrapConfig: a.bootstrapCfg} + opts := &xdsresource.DecodeOptions{ + BootstrapConfig: a.bootstrapCfg, + ServerConfig: a.serverCfg, + } updates, md, err := decodeAllResources(opts, rType, resourceUpdate) a.updateResourceStateAndScheduleCallbacks(rType, updates, md) return err diff --git a/xds/internal/xdsclient/clientimpl_authority.go b/xds/internal/xdsclient/clientimpl_authority.go index c73e3e2e7f32..69db79ee8913 100644 --- a/xds/internal/xdsclient/clientimpl_authority.go +++ b/xds/internal/xdsclient/clientimpl_authority.go @@ -36,7 +36,7 @@ import ( // authority, without holding c.authorityMu. // // Caller must not hold c.authorityMu. -func (c *clientImpl) findAuthority(n *xdsresource.Name) (_ *authority, unref func(), _ error) { +func (c *clientImpl) findAuthority(n *xdsresource.Name) (*authority, func(), error) { scheme, authority := n.Scheme, n.Authority c.authorityMu.Lock() diff --git a/xds/internal/xdsclient/clientimpl_watchers.go b/xds/internal/xdsclient/clientimpl_watchers.go index 045fe77a17ff..f64124dad643 100644 --- a/xds/internal/xdsclient/clientimpl_watchers.go +++ b/xds/internal/xdsclient/clientimpl_watchers.go @@ -48,15 +48,6 @@ func (c *clientImpl) WatchResource(rType xdsresource.Type, resourceName string, return func() {} } - // TODO: replace this with the code does the following when we have - // implemented generic watch API on the authority: - // - Parse the resource name and extract the authority. - // - Locate the corresponding authority object and acquire a reference to - // it. If the authority is not found, error out. - // - Call the watchResource() method on the authority. - // - Return a cancel function to cancel the watch on the authority and to - // release the reference. - // TODO: Make ParseName return an error if parsing fails, and // schedule the OnError callback in that case. n := xdsresource.ParseName(resourceName) diff --git a/xds/internal/xdsclient/tests/resource_update_test.go b/xds/internal/xdsclient/tests/resource_update_test.go index 783e9e413564..89b5d338892e 100644 --- a/xds/internal/xdsclient/tests/resource_update_test.go +++ b/xds/internal/xdsclient/tests/resource_update_test.go @@ -669,9 +669,8 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) { Resources: []*anypb.Any{testutils.MarshalAny(t, resource1)}, }, wantUpdate: xdsresource.ClusterUpdate{ - ClusterName: "resource-name-1", - EDSServiceName: "eds-service-name", - LRSServerConfig: xdsresource.ClusterLRSServerSelf, + ClusterName: "resource-name-1", + EDSServiceName: "eds-service-name", }, wantUpdateMetadata: map[string]xdsresource.UpdateWithMD{ "resource-name-1": { @@ -689,9 +688,8 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) { Resources: []*anypb.Any{testutils.MarshalAny(t, resource1), testutils.MarshalAny(t, resource2)}, }, wantUpdate: xdsresource.ClusterUpdate{ - ClusterName: "resource-name-1", - EDSServiceName: "eds-service-name", - LRSServerConfig: xdsresource.ClusterLRSServerSelf, + ClusterName: "resource-name-1", + EDSServiceName: "eds-service-name", }, wantUpdateMetadata: map[string]xdsresource.UpdateWithMD{ "resource-name-1": { @@ -763,6 +761,16 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) { if gotErr != nil && !strings.Contains(gotErr.Error(), test.wantErr) { t.Fatalf("Got error from handling update: %v, want %v", gotErr, test.wantErr) } + + // For tests expected to succeed, we expect an LRS server config in + // the update from the xDS client, because the LRS bit is turned on + // in the cluster resource. We *cannot* set the LRS server config in + // the test table because we do not have the address of the xDS + // server at that point, hence we do it here before verifying the + // received update. + if test.wantErr == "" { + test.wantUpdate.LRSServerConfig = xdstestutils.ServerConfigForAddress(t, mgmtServer.Address) + } cmpOpts := []cmp.Option{ cmpopts.EquateEmpty(), cmpopts.IgnoreFields(xdsresource.ClusterUpdate{}, "Raw", "LBPolicy"), diff --git a/xds/internal/xdsclient/xdsresource/cluster_resource_type.go b/xds/internal/xdsclient/xdsresource/cluster_resource_type.go index 183801c1c68c..5ac7f0312239 100644 --- a/xds/internal/xdsclient/xdsresource/cluster_resource_type.go +++ b/xds/internal/xdsclient/xdsresource/cluster_resource_type.go @@ -55,7 +55,7 @@ type clusterResourceType struct { // Decode deserializes and validates an xDS resource serialized inside the // provided `Any` proto, as received from the xDS management server. func (clusterResourceType) Decode(opts *DecodeOptions, resource *anypb.Any) (*DecodeResult, error) { - name, cluster, err := unmarshalClusterResource(resource) + name, cluster, err := unmarshalClusterResource(resource, opts.ServerConfig) switch { case name == "": // Name is unset only when protobuf deserialization fails. diff --git a/xds/internal/xdsclient/xdsresource/resource_type.go b/xds/internal/xdsclient/xdsresource/resource_type.go index cf1dff7fe748..a1e15e2d3e21 100644 --- a/xds/internal/xdsclient/xdsresource/resource_type.go +++ b/xds/internal/xdsclient/xdsresource/resource_type.go @@ -133,9 +133,13 @@ type ResourceData interface { // DecodeOptions wraps the options required by ResourceType implementation for // decoding configuration received from the xDS management server. type DecodeOptions struct { - // BootstrapConfig contains the bootstrap configuration passed to the - // top-level xdsClient. This contains useful data for resource validation. + // BootstrapConfig contains the complete bootstrap configuration passed to + // the xDS client. This contains useful data for resource validation. BootstrapConfig *bootstrap.Config + // ServerConfig contains the server config (from the above bootstrap + // configuration) of the xDS server from which the current resource, for + // which Decode() is being invoked, was received. + ServerConfig *bootstrap.ServerConfig } // DecodeResult is the result of a decode operation. diff --git a/xds/internal/xdsclient/xdsresource/tests/unmarshal_cds_test.go b/xds/internal/xdsclient/xdsresource/tests/unmarshal_cds_test.go index e373034e5635..fff78fe00bd7 100644 --- a/xds/internal/xdsclient/xdsresource/tests/unmarshal_cds_test.go +++ b/xds/internal/xdsclient/xdsresource/tests/unmarshal_cds_test.go @@ -26,18 +26,20 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "google.golang.org/grpc/balancer/leastrequest" - _ "google.golang.org/grpc/balancer/roundrobin" // To register round_robin load balancer. "google.golang.org/grpc/internal/balancer/stub" "google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/internal/grpctest" iserviceconfig "google.golang.org/grpc/internal/serviceconfig" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/internal/xds/bootstrap" "google.golang.org/grpc/serviceconfig" - _ "google.golang.org/grpc/xds" // Register the xDS LB Registry Converters. "google.golang.org/grpc/xds/internal/balancer/ringhash" "google.golang.org/grpc/xds/internal/balancer/wrrlocality" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/structpb" "google.golang.org/protobuf/types/known/wrapperspb" v3xdsxdstypepb "github.com/cncf/xds/go/xds/type/v3" @@ -48,9 +50,9 @@ import ( v3ringhashpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/ring_hash/v3" v3roundrobinpb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/round_robin/v3" v3wrrlocalitypb "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/wrr_locality/v3" - "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/types/known/anypb" - "google.golang.org/protobuf/types/known/structpb" + + _ "google.golang.org/grpc/balancer/roundrobin" // To register round_robin load balancer. + _ "google.golang.org/grpc/xds" // Register the xDS LB Registry Converters. ) type s struct { @@ -66,8 +68,6 @@ const ( serviceName = "service" ) -var emptyUpdate = xdsresource.ClusterUpdate{ClusterName: clusterName, LRSServerConfig: xdsresource.ClusterLRSOff} - func wrrLocality(t *testing.T, m proto.Message) *v3wrrlocalitypb.WrrLocality { return &v3wrrlocalitypb.WrrLocality{ EndpointPickingPolicy: &v3clusterpb.LoadBalancingPolicy{ @@ -105,6 +105,7 @@ func (s) TestValidateCluster_Success(t *testing.T) { tests := []struct { name string cluster *v3clusterpb.Cluster + serverCfg *bootstrap.ServerConfig wantUpdate xdsresource.ClusterUpdate wantLBConfig *iserviceconfig.BalancerConfig }{ @@ -164,7 +165,8 @@ func (s) TestValidateCluster_Success(t *testing.T) { LbPolicy: v3clusterpb.Cluster_ROUND_ROBIN, }, wantUpdate: xdsresource.ClusterUpdate{ - ClusterName: clusterName, LRSServerConfig: xdsresource.ClusterLRSOff, ClusterType: xdsresource.ClusterTypeAggregate, + ClusterName: clusterName, + ClusterType: xdsresource.ClusterTypeAggregate, PrioritizedClusterNames: []string{"a", "b", "c"}, }, wantLBConfig: &iserviceconfig.BalancerConfig{ @@ -179,7 +181,7 @@ func (s) TestValidateCluster_Success(t *testing.T) { { name: "happy-case-no-service-name-no-lrs", cluster: e2e.DefaultCluster(clusterName, "", e2e.SecurityLevelNone), - wantUpdate: emptyUpdate, + wantUpdate: xdsresource.ClusterUpdate{ClusterName: clusterName}, wantLBConfig: &iserviceconfig.BalancerConfig{ Name: wrrlocality.Name, Config: &wrrlocality.LBConfig{ @@ -206,16 +208,17 @@ func (s) TestValidateCluster_Success(t *testing.T) { }, }, { - name: "happiest-case", + name: "happiest-case-with-lrs", cluster: e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ ClusterName: clusterName, ServiceName: serviceName, EnableLRS: true, }), + serverCfg: &bootstrap.ServerConfig{ServerURI: "test-server-uri"}, wantUpdate: xdsresource.ClusterUpdate{ ClusterName: clusterName, EDSServiceName: serviceName, - LRSServerConfig: xdsresource.ClusterLRSServerSelf, + LRSServerConfig: &bootstrap.ServerConfig{ServerURI: "test-server-uri"}, }, wantLBConfig: &iserviceconfig.BalancerConfig{ Name: wrrlocality.Name, @@ -248,10 +251,11 @@ func (s) TestValidateCluster_Success(t *testing.T) { } return c }(), + serverCfg: &bootstrap.ServerConfig{ServerURI: "test-server-uri"}, wantUpdate: xdsresource.ClusterUpdate{ ClusterName: clusterName, EDSServiceName: serviceName, - LRSServerConfig: xdsresource.ClusterLRSServerSelf, + LRSServerConfig: &bootstrap.ServerConfig{ServerURI: "test-server-uri"}, MaxRequests: func() *uint32 { i := uint32(512); return &i }(), }, wantLBConfig: &iserviceconfig.BalancerConfig{ @@ -298,7 +302,8 @@ func (s) TestValidateCluster_Success(t *testing.T) { LbPolicy: v3clusterpb.Cluster_LEAST_REQUEST, }, wantUpdate: xdsresource.ClusterUpdate{ - ClusterName: clusterName, EDSServiceName: serviceName, + ClusterName: clusterName, + EDSServiceName: serviceName, }, wantLBConfig: &iserviceconfig.BalancerConfig{ Name: "least_request_experimental", @@ -353,7 +358,8 @@ func (s) TestValidateCluster_Success(t *testing.T) { }, }, wantUpdate: xdsresource.ClusterUpdate{ - ClusterName: clusterName, EDSServiceName: serviceName, + ClusterName: clusterName, + EDSServiceName: serviceName, }, wantLBConfig: &iserviceconfig.BalancerConfig{ Name: "least_request_experimental", @@ -527,7 +533,7 @@ func (s) TestValidateCluster_Success(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - update, err := xdsresource.ValidateClusterAndConstructClusterUpdateForTesting(test.cluster) + update, err := xdsresource.ValidateClusterAndConstructClusterUpdateForTesting(test.cluster, test.serverCfg) if err != nil { t.Errorf("validateClusterAndConstructClusterUpdate(%+v) failed: %v", test.cluster, err) } diff --git a/xds/internal/xdsclient/xdsresource/type_cds.go b/xds/internal/xdsclient/xdsresource/type_cds.go index b782e2455492..ae4cb1e46545 100644 --- a/xds/internal/xdsclient/xdsresource/type_cds.go +++ b/xds/internal/xdsclient/xdsresource/type_cds.go @@ -20,6 +20,7 @@ package xdsresource import ( "encoding/json" + "google.golang.org/grpc/internal/xds/bootstrap" "google.golang.org/protobuf/types/known/anypb" ) @@ -39,18 +40,6 @@ const ( ClusterTypeAggregate ) -// ClusterLRSServerConfigType is the type of LRS server config. -type ClusterLRSServerConfigType int - -const ( - // ClusterLRSOff indicates LRS is off (loads are not reported for this - // cluster). - ClusterLRSOff ClusterLRSServerConfigType = iota - // ClusterLRSServerSelf indicates loads should be reported to the same - // server (the authority) where the CDS resp is received from. - ClusterLRSServerSelf -) - // ClusterUpdate contains information from a received CDS response, which is of // interest to the registered CDS watcher. type ClusterUpdate struct { @@ -60,10 +49,10 @@ type ClusterUpdate struct { // EDSServiceName is an optional name for EDS. If it's not set, the balancer // should watch ClusterName for the EDS resources. EDSServiceName string - // LRSServerConfig contains the server where the load reports should be sent - // to. This can be change to an interface, to support other types, e.g. a - // ServerConfig with ServerURI, creds. - LRSServerConfig ClusterLRSServerConfigType + // LRSServerConfig contains configuration about the xDS server that sent + // this cluster resource. This is also the server where load reports are to + // be sent, for this cluster. + LRSServerConfig *bootstrap.ServerConfig // SecurityCfg contains security configuration sent by the control plane. SecurityCfg *SecurityConfig // MaxRequests for circuit breaking, if any (otherwise nil). diff --git a/xds/internal/xdsclient/xdsresource/unmarshal_cds.go b/xds/internal/xdsclient/xdsresource/unmarshal_cds.go index 276bf405e330..de12f478bd06 100644 --- a/xds/internal/xdsclient/xdsresource/unmarshal_cds.go +++ b/xds/internal/xdsclient/xdsresource/unmarshal_cds.go @@ -34,6 +34,7 @@ import ( "google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/internal/pretty" iserviceconfig "google.golang.org/grpc/internal/serviceconfig" + "google.golang.org/grpc/internal/xds/bootstrap" "google.golang.org/grpc/internal/xds/matcher" "google.golang.org/grpc/xds/internal/xdsclient/xdslbregistry" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" @@ -50,7 +51,7 @@ var ValidateClusterAndConstructClusterUpdateForTesting = validateClusterAndConst // to this value by the management server. const transportSocketName = "envoy.transport_sockets.tls" -func unmarshalClusterResource(r *anypb.Any) (string, ClusterUpdate, error) { +func unmarshalClusterResource(r *anypb.Any, serverCfg *bootstrap.ServerConfig) (string, ClusterUpdate, error) { r, err := UnwrapResource(r) if err != nil { return "", ClusterUpdate{}, fmt.Errorf("failed to unwrap resource: %v", err) @@ -64,7 +65,7 @@ func unmarshalClusterResource(r *anypb.Any) (string, ClusterUpdate, error) { if err := proto.Unmarshal(r.GetValue(), cluster); err != nil { return "", ClusterUpdate{}, fmt.Errorf("failed to unmarshal resource: %v", err) } - cu, err := validateClusterAndConstructClusterUpdate(cluster) + cu, err := validateClusterAndConstructClusterUpdate(cluster, serverCfg) if err != nil { return cluster.GetName(), ClusterUpdate{}, err } @@ -81,7 +82,7 @@ const ( defaultLeastRequestChoiceCount = 2 ) -func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (ClusterUpdate, error) { +func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster, serverCfg *bootstrap.ServerConfig) (ClusterUpdate, error) { telemetryLabels := make(map[string]string) if fmd := cluster.GetMetadata().GetFilterMetadata(); fmd != nil { if val, ok := fmd["com.google.csm.telemetry_labels"]; ok { @@ -182,21 +183,11 @@ func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (Clu TelemetryLabels: telemetryLabels, } - // Note that this is different from the gRFC (gRFC A47 says to include the - // full ServerConfig{URL,creds,server feature} here). This information is - // not available here, because this function doesn't have access to the - // xdsclient bootstrap information now (can be added if necessary). The - // ServerConfig will be read and populated by the CDS balancer when - // processing this field. - // According to A27: - // If the `lrs_server` field is set, it must have its `self` field set, in - // which case the client should use LRS for load reporting. Otherwise - // (the `lrs_server` field is not set), LRS load reporting will be disabled. if lrs := cluster.GetLrsServer(); lrs != nil { if lrs.GetSelf() == nil { return ClusterUpdate{}, fmt.Errorf("unsupported config_source_specifier %T in lrs_server field", lrs.ConfigSourceSpecifier) } - ret.LRSServerConfig = ClusterLRSServerSelf + ret.LRSServerConfig = serverCfg } // Validate and set cluster type from the response. diff --git a/xds/internal/xdsclient/xdsresource/unmarshal_cds_test.go b/xds/internal/xdsclient/xdsresource/unmarshal_cds_test.go index a345466961c2..d008b2510c31 100644 --- a/xds/internal/xdsclient/xdsresource/unmarshal_cds_test.go +++ b/xds/internal/xdsclient/xdsresource/unmarshal_cds_test.go @@ -27,6 +27,7 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" "google.golang.org/grpc/internal/pretty" "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/xds/bootstrap" "google.golang.org/grpc/internal/xds/matcher" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" @@ -50,14 +51,11 @@ const ( serviceName = "service" ) -var emptyUpdate = ClusterUpdate{ClusterName: clusterName, LRSServerConfig: ClusterLRSOff} - func (s) TestValidateCluster_Failure(t *testing.T) { tests := []struct { - name string - cluster *v3clusterpb.Cluster - wantUpdate ClusterUpdate - wantErr bool + name string + cluster *v3clusterpb.Cluster + wantErr bool }{ { name: "non-supported-cluster-type-static", @@ -72,8 +70,7 @@ func (s) TestValidateCluster_Failure(t *testing.T) { }, LbPolicy: v3clusterpb.Cluster_LEAST_REQUEST, }, - wantUpdate: emptyUpdate, - wantErr: true, + wantErr: true, }, { name: "non-supported-cluster-type-original-dst", @@ -88,8 +85,7 @@ func (s) TestValidateCluster_Failure(t *testing.T) { }, LbPolicy: v3clusterpb.Cluster_LEAST_REQUEST, }, - wantUpdate: emptyUpdate, - wantErr: true, + wantErr: true, }, { name: "no-eds-config", @@ -97,8 +93,7 @@ func (s) TestValidateCluster_Failure(t *testing.T) { ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_EDS}, LbPolicy: v3clusterpb.Cluster_ROUND_ROBIN, }, - wantUpdate: emptyUpdate, - wantErr: true, + wantErr: true, }, { name: "no-ads-config-source", @@ -107,8 +102,7 @@ func (s) TestValidateCluster_Failure(t *testing.T) { EdsClusterConfig: &v3clusterpb.Cluster_EdsClusterConfig{}, LbPolicy: v3clusterpb.Cluster_ROUND_ROBIN, }, - wantUpdate: emptyUpdate, - wantErr: true, + wantErr: true, }, { name: "non-round-robin-or-ring-hash-lb-policy", @@ -123,8 +117,7 @@ func (s) TestValidateCluster_Failure(t *testing.T) { }, LbPolicy: v3clusterpb.Cluster_LEAST_REQUEST, }, - wantUpdate: emptyUpdate, - wantErr: true, + wantErr: true, }, { name: "logical-dns-multiple-localities", @@ -140,8 +133,7 @@ func (s) TestValidateCluster_Failure(t *testing.T) { }, }, }, - wantUpdate: emptyUpdate, - wantErr: true, + wantErr: true, }, { name: "ring-hash-hash-function-not-xx-hash", @@ -153,8 +145,7 @@ func (s) TestValidateCluster_Failure(t *testing.T) { }, }, }, - wantUpdate: emptyUpdate, - wantErr: true, + wantErr: true, }, { name: "least-request-choice-count-less-than-two", @@ -166,8 +157,7 @@ func (s) TestValidateCluster_Failure(t *testing.T) { }, }, }, - wantUpdate: emptyUpdate, - wantErr: true, + wantErr: true, }, { name: "ring-hash-max-bound-greater-than-upper-bound", @@ -179,8 +169,7 @@ func (s) TestValidateCluster_Failure(t *testing.T) { }, }, }, - wantUpdate: emptyUpdate, - wantErr: true, + wantErr: true, }, { name: "ring-hash-max-bound-greater-than-upper-bound-load-balancing-policy", @@ -209,8 +198,7 @@ func (s) TestValidateCluster_Failure(t *testing.T) { }, }, }, - wantUpdate: emptyUpdate, - wantErr: true, + wantErr: true, }, { name: "least-request-unsupported-in-converter-since-env-var-unset", @@ -235,8 +223,7 @@ func (s) TestValidateCluster_Failure(t *testing.T) { }, }, }, - wantUpdate: emptyUpdate, - wantErr: true, + wantErr: true, }, { name: "aggregate-nil-clusters", @@ -250,8 +237,7 @@ func (s) TestValidateCluster_Failure(t *testing.T) { }, LbPolicy: v3clusterpb.Cluster_ROUND_ROBIN, }, - wantUpdate: emptyUpdate, - wantErr: true, + wantErr: true, }, { name: "aggregate-empty-clusters", @@ -267,14 +253,13 @@ func (s) TestValidateCluster_Failure(t *testing.T) { }, LbPolicy: v3clusterpb.Cluster_ROUND_ROBIN, }, - wantUpdate: emptyUpdate, - wantErr: true, + wantErr: true, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - if update, err := validateClusterAndConstructClusterUpdate(test.cluster); err == nil { + if update, err := validateClusterAndConstructClusterUpdate(test.cluster, nil); err == nil { t.Errorf("validateClusterAndConstructClusterUpdate(%+v) = %v, wanted error", test.cluster, update) } }) @@ -882,9 +867,8 @@ func (s) TestValidateClusterWithSecurityConfig(t *testing.T) { }, }, wantUpdate: ClusterUpdate{ - ClusterName: clusterName, - EDSServiceName: serviceName, - LRSServerConfig: ClusterLRSOff, + ClusterName: clusterName, + EDSServiceName: serviceName, SecurityCfg: &SecurityConfig{ RootInstanceName: rootPluginInstance, RootCertName: rootCertName, @@ -924,9 +908,8 @@ func (s) TestValidateClusterWithSecurityConfig(t *testing.T) { }, }, wantUpdate: ClusterUpdate{ - ClusterName: clusterName, - EDSServiceName: serviceName, - LRSServerConfig: ClusterLRSOff, + ClusterName: clusterName, + EDSServiceName: serviceName, SecurityCfg: &SecurityConfig{ RootInstanceName: rootPluginInstance, RootCertName: rootCertName, @@ -968,9 +951,8 @@ func (s) TestValidateClusterWithSecurityConfig(t *testing.T) { }, }, wantUpdate: ClusterUpdate{ - ClusterName: clusterName, - EDSServiceName: serviceName, - LRSServerConfig: ClusterLRSOff, + ClusterName: clusterName, + EDSServiceName: serviceName, SecurityCfg: &SecurityConfig{ RootInstanceName: rootPluginInstance, RootCertName: rootCertName, @@ -1016,9 +998,8 @@ func (s) TestValidateClusterWithSecurityConfig(t *testing.T) { }, }, wantUpdate: ClusterUpdate{ - ClusterName: clusterName, - EDSServiceName: serviceName, - LRSServerConfig: ClusterLRSOff, + ClusterName: clusterName, + EDSServiceName: serviceName, SecurityCfg: &SecurityConfig{ RootInstanceName: rootPluginInstance, RootCertName: rootCertName, @@ -1076,9 +1057,8 @@ func (s) TestValidateClusterWithSecurityConfig(t *testing.T) { }, }, wantUpdate: ClusterUpdate{ - ClusterName: clusterName, - EDSServiceName: serviceName, - LRSServerConfig: ClusterLRSOff, + ClusterName: clusterName, + EDSServiceName: serviceName, SecurityCfg: &SecurityConfig{ RootInstanceName: rootPluginInstance, RootCertName: rootCertName, @@ -1143,9 +1123,8 @@ func (s) TestValidateClusterWithSecurityConfig(t *testing.T) { }, }, wantUpdate: ClusterUpdate{ - ClusterName: clusterName, - EDSServiceName: serviceName, - LRSServerConfig: ClusterLRSOff, + ClusterName: clusterName, + EDSServiceName: serviceName, SecurityCfg: &SecurityConfig{ RootInstanceName: rootPluginInstance, RootCertName: rootCertName, @@ -1165,7 +1144,7 @@ func (s) TestValidateClusterWithSecurityConfig(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - update, err := validateClusterAndConstructClusterUpdate(test.cluster) + update, err := validateClusterAndConstructClusterUpdate(test.cluster, nil) if (err != nil) != test.wantErr { t.Errorf("validateClusterAndConstructClusterUpdate() returned err %v wantErr %v)", err, test.wantErr) } @@ -1287,6 +1266,7 @@ func (s) TestUnmarshalCluster(t *testing.T) { tests := []struct { name string resource *anypb.Any + serverCfg *bootstrap.ServerConfig wantName string wantUpdate ClusterUpdate wantErr bool @@ -1337,43 +1317,50 @@ func (s) TestUnmarshalCluster(t *testing.T) { wantErr: true, }, { - name: "v3 cluster", - resource: v3ClusterAny, - wantName: v3ClusterName, + name: "v3 cluster", + resource: v3ClusterAny, + serverCfg: &bootstrap.ServerConfig{ServerURI: "test-server-uri"}, + wantName: v3ClusterName, wantUpdate: ClusterUpdate{ - ClusterName: v3ClusterName, - EDSServiceName: v3Service, LRSServerConfig: ClusterLRSServerSelf, - Raw: v3ClusterAny, + ClusterName: v3ClusterName, + EDSServiceName: v3Service, + LRSServerConfig: &bootstrap.ServerConfig{ServerURI: "test-server-uri"}, + Raw: v3ClusterAny, }, }, { - name: "v3 cluster wrapped", - resource: testutils.MarshalAny(t, &v3discoverypb.Resource{Resource: v3ClusterAny}), - wantName: v3ClusterName, + name: "v3 cluster wrapped", + resource: testutils.MarshalAny(t, &v3discoverypb.Resource{Resource: v3ClusterAny}), + serverCfg: &bootstrap.ServerConfig{ServerURI: "test-server-uri"}, + wantName: v3ClusterName, wantUpdate: ClusterUpdate{ - ClusterName: v3ClusterName, - EDSServiceName: v3Service, LRSServerConfig: ClusterLRSServerSelf, - Raw: v3ClusterAny, + ClusterName: v3ClusterName, + EDSServiceName: v3Service, + LRSServerConfig: &bootstrap.ServerConfig{ServerURI: "test-server-uri"}, + Raw: v3ClusterAny, }, }, { - name: "v3 cluster with EDS config source self", - resource: v3ClusterAnyWithEDSConfigSourceSelf, - wantName: v3ClusterName, + name: "v3 cluster with EDS config source self", + resource: v3ClusterAnyWithEDSConfigSourceSelf, + serverCfg: &bootstrap.ServerConfig{ServerURI: "test-server-uri"}, + wantName: v3ClusterName, wantUpdate: ClusterUpdate{ - ClusterName: v3ClusterName, - EDSServiceName: v3Service, LRSServerConfig: ClusterLRSServerSelf, - Raw: v3ClusterAnyWithEDSConfigSourceSelf, + ClusterName: v3ClusterName, + EDSServiceName: v3Service, + LRSServerConfig: &bootstrap.ServerConfig{ServerURI: "test-server-uri"}, + Raw: v3ClusterAnyWithEDSConfigSourceSelf, }, }, { - name: "v3 cluster with telemetry case", - resource: v3ClusterAnyWithTelemetryLabels, - wantName: v3ClusterName, + name: "v3 cluster with telemetry case", + resource: v3ClusterAnyWithTelemetryLabels, + serverCfg: &bootstrap.ServerConfig{ServerURI: "test-server-uri"}, + wantName: v3ClusterName, wantUpdate: ClusterUpdate{ ClusterName: v3ClusterName, EDSServiceName: v3Service, - LRSServerConfig: ClusterLRSServerSelf, + LRSServerConfig: &bootstrap.ServerConfig{ServerURI: "test-server-uri"}, Raw: v3ClusterAnyWithTelemetryLabels, TelemetryLabels: map[string]string{ "service_name": "grpc-service", @@ -1382,13 +1369,14 @@ func (s) TestUnmarshalCluster(t *testing.T) { }, }, { - name: "v3 metadata ignore other types not string and not com.google.csm.telemetry_labels", - resource: v3ClusterAnyWithTelemetryLabelsIgnoreSome, - wantName: v3ClusterName, + name: "v3 metadata ignore other types not string and not com.google.csm.telemetry_labels", + resource: v3ClusterAnyWithTelemetryLabelsIgnoreSome, + serverCfg: &bootstrap.ServerConfig{ServerURI: "test-server-uri"}, + wantName: v3ClusterName, wantUpdate: ClusterUpdate{ ClusterName: v3ClusterName, EDSServiceName: v3Service, - LRSServerConfig: ClusterLRSServerSelf, + LRSServerConfig: &bootstrap.ServerConfig{ServerURI: "test-server-uri"}, Raw: v3ClusterAnyWithTelemetryLabelsIgnoreSome, TelemetryLabels: map[string]string{ "service_name": "grpc-service", @@ -1415,7 +1403,7 @@ func (s) TestUnmarshalCluster(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - name, update, err := unmarshalClusterResource(test.resource) + name, update, err := unmarshalClusterResource(test.resource, test.serverCfg) if (err != nil) != test.wantErr { t.Fatalf("unmarshalClusterResource(%s), got err: %v, wantErr: %v", pretty.ToJSON(test.resource), err, test.wantErr) } @@ -1584,7 +1572,7 @@ func (s) TestValidateClusterWithOutlierDetection(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - update, err := validateClusterAndConstructClusterUpdate(test.cluster) + update, err := validateClusterAndConstructClusterUpdate(test.cluster, nil) if (err != nil) != test.wantErr { t.Errorf("validateClusterAndConstructClusterUpdate() returned err %v wantErr %v)", err, test.wantErr) }