From 19dd066be156c4243365900d17d39caeb37988e8 Mon Sep 17 00:00:00 2001 From: Pavithra Ramesh Date: Fri, 31 Jul 2020 08:16:12 -0700 Subject: [PATCH 1/2] Allow L4 ILB proto to be modified. Protocol modification results in forwarding rule being delete before backend service update. Forwarding rule reuses the IP from service status, to avoid IP address change due to proto change. Other changes: Also use GA API for forwarding rule creation always since Global Access is GA. Remove feature gate logic for Custom Subnet. Simplified Forwarding Rule Equality Logic. If the IP address in old and new FR do not match, it is considered to be a change. Special case for empty IP removed. --- pkg/backends/backends.go | 7 ++- pkg/loadbalancers/forwarding_rules.go | 87 +++++++++------------------ pkg/loadbalancers/l4.go | 22 ++++++- 3 files changed, 53 insertions(+), 63 deletions(-) diff --git a/pkg/backends/backends.go b/pkg/backends/backends.go index 0e2f6594ce..b508364ca3 100644 --- a/pkg/backends/backends.go +++ b/pkg/backends/backends.go @@ -265,6 +265,9 @@ func (b *Backends) EnsureL4BackendService(name, hcLink, protocol, sessionAffinit } if protocol == string(api_v1.ProtocolTCP) { expectedBS.ConnectionDraining = &composite.ConnectionDraining{DrainingTimeoutSec: DefaultConnectionDrainingTimeoutSeconds} + } else { + // This config is not supported in UDP mode, explicitly set to 0 to reset, if proto was TCP previously. + expectedBS.ConnectionDraining = &composite.ConnectionDraining{DrainingTimeoutSec: 0} } // Create backend service if none was found @@ -284,8 +287,8 @@ func (b *Backends) EnsureL4BackendService(name, hcLink, protocol, sessionAffinit if backendSvcEqual(expectedBS, bs) { return bs, nil } - if bs.ConnectionDraining != nil && bs.ConnectionDraining.DrainingTimeoutSec > 0 { - // if user overrides this value, continue using that. + if bs.ConnectionDraining != nil && bs.ConnectionDraining.DrainingTimeoutSec > 0 && protocol == string(api_v1.ProtocolTCP) { + // only preserves user overridden timeout value when the protocol is TCP expectedBS.ConnectionDraining.DrainingTimeoutSec = bs.ConnectionDraining.DrainingTimeoutSec } klog.V(2).Infof("EnsureL4BackendService: updating backend service %v", name) diff --git a/pkg/loadbalancers/forwarding_rules.go b/pkg/loadbalancers/forwarding_rules.go index b86a1fa403..7809845c75 100644 --- a/pkg/loadbalancers/forwarding_rules.go +++ b/pkg/loadbalancers/forwarding_rules.go @@ -196,61 +196,27 @@ func (l *L4) ensureForwardingRule(loadBalancerName, bsLink string, options gce.I if err != nil { return nil, err } - desc := utils.L4ILBResourceDescription{} // version used for creating the existing forwarding rule. - existingVersion := meta.VersionGA - // version to use for the new forwarding rule - newVersion := getAPIVersion(options) + version := meta.VersionGA // Get the GA version forwarding rule, use the description to identify the version it was created with. existingFwdRule, err := composite.GetForwardingRule(l.cloud, key, meta.VersionGA) if utils.IgnoreHTTPNotFound(err) != nil { return nil, err } - if existingFwdRule != nil { - if err = desc.Unmarshal(existingFwdRule.Description); err != nil { - klog.Warningf("Failed to lookup forwarding rule version from description, err %v. Using GA Version.", err) - } else { - existingVersion = desc.APIVersion - } - } - // Fetch the right forwarding rule in case it is not using GA - if existingVersion != meta.VersionGA { - existingFwdRule, err = composite.GetForwardingRule(l.cloud, key, existingVersion) - if utils.IgnoreHTTPNotFound(err) != nil { - klog.Errorf("Failed to lookup forwarding rule '%s' at version - %s, err %v", key.Name, existingVersion, err) - return nil, err - } - } - if l.cloud.IsLegacyNetwork() { l.recorder.Event(l.Service, v1.EventTypeWarning, "ILBOptionsIgnored", "Internal LoadBalancer options are not supported with Legacy Networks.") options = gce.ILBOptions{} } subnetworkURL := l.cloud.SubnetworkURL() - if !l.cloud.AlphaFeatureGate.Enabled(gce.AlphaFeatureILBCustomSubnet) { - if options.SubnetName != "" { - l.recorder.Event(l.Service, v1.EventTypeWarning, "ILBCustomSubnetOptionIgnored", "Internal LoadBalancer CustomSubnet options ignored as the feature gate is disabled.") - options.SubnetName = "" - } - } - if l.cloud.AlphaFeatureGate.Enabled(gce.AlphaFeatureILBCustomSubnet) { - // If this feature is enabled, changes to subnet annotation will be - // picked up and reflected in the forwarding rule. - // Removing the annotation will set the forwarding rule to use the default subnet. - if options.SubnetName != "" { - subnetKey := *key - subnetKey.Name = options.SubnetName - subnetworkURL = cloud.SelfLink(meta.VersionGA, l.cloud.ProjectID(), "subnetworks", &subnetKey) - } - } else { - // TODO(84885) remove this once ILBCustomSubnet goes beta. - if existingFwdRule != nil && existingFwdRule.Subnetwork != "" { - // If the ILB already exists, continue using the subnet that it's already using. - // This is to support existing ILBs that were setup using the wrong subnet - https://github.com/kubernetes/kubernetes/pull/57861 - subnetworkURL = existingFwdRule.Subnetwork - } + // Custom subnet feature is always enabled when running L4 controller. + // Changes to subnet annotation will be picked up and reflected in the forwarding rule. + // Removing the annotation will set the forwarding rule to use the default subnet. + if options.SubnetName != "" { + subnetKey := *key + subnetKey.Name = options.SubnetName + subnetworkURL = cloud.SelfLink(meta.VersionGA, l.cloud.ProjectID(), "subnetworks", &subnetKey) } // Determine IP which will be used for this LB. If no forwarding rule has been established // or specified in the Service spec, then requestedIP = "". @@ -279,7 +245,7 @@ func (l *L4) ensureForwardingRule(loadBalancerName, bsLink string, options gce.I ports, _, protocol := utils.GetPortsAndProtocol(l.Service.Spec.Ports) // Create the forwarding rule frDesc, err := utils.MakeL4ILBServiceDescription(utils.ServiceKeyFunc(l.Service.Namespace, l.Service.Name), ipToUse, - newVersion) + version) if err != nil { return nil, fmt.Errorf("Failed to compute description for forwarding rule %s, err: %v", loadBalancerName, err) @@ -293,7 +259,7 @@ func (l *L4) ensureForwardingRule(loadBalancerName, bsLink string, options gce.I LoadBalancingScheme: string(cloud.SchemeInternal), Subnetwork: subnetworkURL, Network: l.cloud.NetworkURL(), - Version: newVersion, + Version: version, BackendService: bsLink, AllowGlobalAccess: options.AllowGlobalAccess, Description: frDesc, @@ -308,7 +274,7 @@ func (l *L4) ensureForwardingRule(loadBalancerName, bsLink string, options gce.I // If the forwarding rule pointed to a backend service which does not match the controller naming scheme, // that resouce could be leaked. It is not being deleted here because that is a user-managed resource. klog.V(2).Infof("ensureForwardingRule: Deleting existing forwarding rule - %s, will be recreated", fr.Name) - if err = utils.IgnoreHTTPNotFound(composite.DeleteForwardingRule(l.cloud, key, existingVersion)); err != nil { + if err = utils.IgnoreHTTPNotFound(composite.DeleteForwardingRule(l.cloud, key, version)); err != nil { return nil, err } l.recorder.Eventf(l.Service, corev1.EventTypeNormal, events.SyncIngress, "ForwardingRule %q deleted", key.Name) @@ -321,13 +287,19 @@ func (l *L4) ensureForwardingRule(loadBalancerName, bsLink string, options gce.I return composite.GetForwardingRule(l.cloud, key, fr.Version) } +func (l *L4) deleteForwardingRule(name string, version meta.Version) { + key, err := l.CreateKey(name) + if err != nil { + klog.Errorf("Failed to create key for deleting forwarding rule %s, err: %v", name, err) + return + } + if err := utils.IgnoreHTTPNotFound(composite.DeleteForwardingRule(l.cloud, key, version)); err != nil { + klog.Errorf("Failed to delete forwarding rule %s, err: %v", name, err) + } +} + func Equal(fr1, fr2 *composite.ForwardingRule) bool { - // If one of the IP addresses is empty, do not consider it as an inequality. - // If the IP address drops from a valid IP to empty, we do not want to apply - // the change if it is the only change in the forwarding rule. Similarly, if - // the forwarding rule changes from an empty IP to an allocated IP address, the - // subnetwork will change as well. - return (fr1.IPAddress == "" || fr2.IPAddress == "" || fr1.IPAddress == fr2.IPAddress) && + return fr1.IPAddress == fr2.IPAddress && fr1.IPProtocol == fr2.IPProtocol && fr1.LoadBalancingScheme == fr2.LoadBalancingScheme && utils.EqualStringSets(fr1.Ports, fr2.Ports) && @@ -344,6 +316,11 @@ func ilbIPToUse(svc *v1.Service, fwdRule *composite.ForwardingRule, requestedSub return svc.Spec.LoadBalancerIP } if fwdRule == nil { + // Reuse the already assigned IP address for this ILB. This is most likely the case + // where ILB protocol changed and the forwarding rule got deleted. + if len(svc.Status.LoadBalancer.Ingress) > 0 { + return svc.Status.LoadBalancer.Ingress[0].IP + } return "" } if requestedSubnet != fwdRule.Subnetwork { @@ -352,11 +329,3 @@ func ilbIPToUse(svc *v1.Service, fwdRule *composite.ForwardingRule, requestedSub } return fwdRule.IPAddress } - -// getAPIVersion returns the API version to use for CRUD of Forwarding rules, given the options enabled. -func getAPIVersion(options gce.ILBOptions) meta.Version { - if options.AllowGlobalAccess { - return meta.VersionBeta - } - return meta.VersionGA -} diff --git a/pkg/loadbalancers/l4.go b/pkg/loadbalancers/l4.go index e4e90a3a96..b84388b7d0 100644 --- a/pkg/loadbalancers/l4.go +++ b/pkg/loadbalancers/l4.go @@ -90,7 +90,7 @@ func (l *L4) EnsureInternalLoadBalancerDeleted(svc *corev1.Service) error { } retErr := err // If any resource deletion fails, log the error and continue cleanup. - if err = utils.IgnoreHTTPNotFound(composite.DeleteForwardingRule(l.cloud, key, getAPIVersion(getILBOptions(l.Service)))); err != nil { + if err = utils.IgnoreHTTPNotFound(composite.DeleteForwardingRule(l.cloud, key, meta.VersionGA)); err != nil { klog.Errorf("Failed to delete forwarding rule for internal loadbalancer service %s, err %v", l.NamespacedName.String(), err) retErr = err } @@ -158,8 +158,12 @@ func (l *L4) EnsureInternalLoadBalancerDeleted(svc *corev1.Service) error { // service. func (l *L4) GetFRName() string { _, _, protocol := utils.GetPortsAndProtocol(l.Service.Spec.Ports) + return l.getFRNameWithProtocol(string(protocol)) +} + +func (l *L4) getFRNameWithProtocol(protocol string) string { lbName := l.namer.VMIPNEG(l.Service.Namespace, l.Service.Name) - return lbName + "-" + strings.ToLower(string(protocol)) + return lbName + "-" + strings.ToLower(protocol) } // EnsureInternalLoadBalancer ensures that all GCE resources for the given loadbalancer service have @@ -220,6 +224,20 @@ func (l *L4) EnsureInternalLoadBalancer(nodeNames []string, svc *corev1.Service) if err != nil { return nil, err } + + // Check if protocol has changed for this service. In this case, forwarding rule should be deleted before + // the backend service can be updated. + existingBS, err := l.backendPool.Get(name, meta.VersionGA, l.scope) + err = utils.IgnoreHTTPNotFound(err) + if err != nil { + klog.Errorf("Failed to lookup existing backend service, ignoring err: %v", err) + } + if existingBS != nil && existingBS.Protocol != string(protocol) { + klog.Infof("Protocol changed from %q to %q for service %s", existingBS.Protocol, string(protocol), l.NamespacedName) + // Delete forwarding rule if it exists + l.deleteForwardingRule(l.getFRNameWithProtocol(existingBS.Protocol), meta.VersionGA) + } + // ensure backend service bs, err := l.backendPool.EnsureL4BackendService(name, hcLink, string(protocol), string(l.Service.Spec.SessionAffinity), string(cloud.SchemeInternal), l.NamespacedName, meta.VersionGA) From e7139465aa0a5b1501f1075eaf64ef3e3d6926bd Mon Sep 17 00:00:00 2001 From: Pavithra Ramesh Date: Fri, 31 Jul 2020 08:20:16 -0700 Subject: [PATCH 2/2] fix unit test and remove duplicate test case. --- pkg/loadbalancers/l4_test.go | 174 ++++++++++++++++++++--------------- 1 file changed, 98 insertions(+), 76 deletions(-) diff --git a/pkg/loadbalancers/l4_test.go b/pkg/loadbalancers/l4_test.go index 0b8e05bb1e..63ab9c9421 100644 --- a/pkg/loadbalancers/l4_test.go +++ b/pkg/loadbalancers/l4_test.go @@ -17,6 +17,8 @@ limitations under the License. */ import ( + "context" + "fmt" "reflect" "strings" "sync" @@ -730,7 +732,7 @@ func TestEnsureInternalLoadBalancerEnableGlobalAccess(t *testing.T) { t.Errorf("Got empty loadBalancer status using handler %v", l) } assertInternalLbResources(t, svc, l, nodeNames) - betaRuleDescString, err := utils.MakeL4ILBServiceDescription(utils.ServiceKeyFunc(svc.Namespace, svc.Name), "1.2.3.0", meta.VersionBeta) + descString, err := utils.MakeL4ILBServiceDescription(utils.ServiceKeyFunc(svc.Namespace, svc.Name), "1.2.3.0", meta.VersionGA) if err != nil { t.Errorf("Unexpected error when creating description - %v", err) } @@ -738,15 +740,15 @@ func TestEnsureInternalLoadBalancerEnableGlobalAccess(t *testing.T) { if err != nil { t.Errorf("Unexpected error when creating key - %v", err) } - fwdRule, err := composite.GetForwardingRule(l.cloud, key, meta.VersionBeta) + fwdRule, err := composite.GetForwardingRule(l.cloud, key, meta.VersionGA) if err != nil { t.Errorf("Unexpected error when looking up forwarding rule - %v", err) } if !fwdRule.AllowGlobalAccess { t.Errorf("Unexpected false value for AllowGlobalAccess") } - if fwdRule.Description != betaRuleDescString { - t.Errorf("Expected description %s, Got %s", betaRuleDescString, fwdRule.Description) + if fwdRule.Description != descString { + t.Errorf("Expected description %s, Got %s", descString, fwdRule.Description) } // remove the annotation and disable global access. delete(svc.Annotations, gce.ServiceAnnotationILBAllowGlobalAccess) @@ -757,13 +759,8 @@ func TestEnsureInternalLoadBalancerEnableGlobalAccess(t *testing.T) { if len(status.Ingress) == 0 { t.Errorf("Got empty loadBalancer status using handler %v", l) } - gaRuleDescString, err := utils.MakeL4ILBServiceDescription(utils.ServiceKeyFunc(svc.Namespace, svc.Name), "1.2.3.0", meta.VersionGA) - if err != nil { - t.Errorf("Unexpected error when creating description - %v", err) - } - // Fetch the beta version of the rule to make sure GlobalAccess field is off. Calling the GA API will always show - // this field as false. - fwdRule, err = composite.GetForwardingRule(l.cloud, key, meta.VersionBeta) + // make sure GlobalAccess field is off. + fwdRule, err = composite.GetForwardingRule(l.cloud, key, meta.VersionGA) if err != nil { t.Errorf("Unexpected error when looking up forwarding rule - %v", err) } @@ -771,71 +768,9 @@ func TestEnsureInternalLoadBalancerEnableGlobalAccess(t *testing.T) { if fwdRule.AllowGlobalAccess { t.Errorf("Unexpected true value for AllowGlobalAccess") } - if fwdRule.Description != gaRuleDescString { - t.Errorf("Expected description %s, Got %s", gaRuleDescString, fwdRule.Description) - } - // Delete the service - err = l.EnsureInternalLoadBalancerDeleted(svc) - if err != nil { - t.Errorf("Unexpected error %v", err) - } - assertInternalLbResourcesDeleted(t, svc, true, l) -} - -func TestEnsureInternalLoadBalancerDisableGlobalAccess(t *testing.T) { - t.Parallel() - - vals := gce.DefaultTestClusterValues() - fakeGCE := getFakeGCECloud(vals) - - nodeNames := []string{"test-node-1"} - svc := test.NewL4ILBService(false, 8080) - namer := namer_util.NewNamer(clusterUID, "") - l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{}, fakeMetricsCollector) - _, err := test.CreateAndInsertNodes(l.cloud, nodeNames, vals.ZoneName) - if err != nil { - t.Errorf("Unexpected error when adding nodes %v", err) - } - svc.Annotations[gce.ServiceAnnotationILBAllowGlobalAccess] = "true" - frName := l.GetFRName() - status, err := l.EnsureInternalLoadBalancer(nodeNames, svc) - if err != nil { - t.Errorf("Failed to ensure loadBalancer, err %v", err) + if fwdRule.Description != descString { + t.Errorf("Expected description %s, Got %s", descString, fwdRule.Description) } - if len(status.Ingress) == 0 { - t.Errorf("Got empty loadBalancer status using handler %v", l) - } - assertInternalLbResources(t, svc, l, nodeNames) - key, err := composite.CreateKey(l.cloud, frName, meta.Regional) - if err != nil { - t.Errorf("Unexpected error when creating key - %v", err) - } - - fwdRule, err := composite.GetForwardingRule(l.cloud, key, meta.VersionBeta) - if err != nil { - t.Errorf("Unexpected error when looking up forwarding rule - %v", err) - } - if !fwdRule.AllowGlobalAccess { - t.Errorf("Unexpected false value for AllowGlobalAccess") - } - - // disable global access - setting the annotation to false or removing annotation will disable it - svc.Annotations[gce.ServiceAnnotationILBAllowGlobalAccess] = "false" - status, err = l.EnsureInternalLoadBalancer(nodeNames, svc) - if err != nil { - t.Errorf("Failed to ensure loadBalancer, err %v", err) - } - if len(status.Ingress) == 0 { - t.Errorf("Got empty loadBalancer status using handler %v", l) - } - fwdRule, err = composite.GetForwardingRule(l.cloud, key, meta.VersionBeta) - if err != nil { - t.Errorf("Unexpected error when looking up forwarding rule - %v", err) - } - if fwdRule.AllowGlobalAccess { - t.Errorf("Unexpected 'true' value for AllowGlobalAccess") - } - // Delete the service err = l.EnsureInternalLoadBalancerDeleted(svc) if err != nil { @@ -849,7 +784,6 @@ func TestEnsureInternalLoadBalancerCustomSubnet(t *testing.T) { nodeNames := []string{"test-node-1"} vals := gce.DefaultTestClusterValues() fakeGCE := getFakeGCECloud(vals) - fakeGCE.AlphaFeatureGate = gce.NewAlphaFeatureGate([]string{gce.AlphaFeatureILBCustomSubnet}) svc := test.NewL4ILBService(false, 8080) namer := namer_util.NewNamer(clusterUID, "") @@ -988,6 +922,94 @@ func TestEnsureInternalFirewallPortRanges(t *testing.T) { } } +func TestEnsureInternalLoadBalancerModifyProtocol(t *testing.T) { + t.Parallel() + + vals := gce.DefaultTestClusterValues() + fakeGCE := getFakeGCECloud(vals) + c := fakeGCE.Compute().(*cloud.MockGCE) + nodeNames := []string{"test-node-1"} + svc := test.NewL4ILBService(false, 8080) + namer := namer_util.NewNamer(clusterUID, "") + l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{}, fakeMetricsCollector) + _, err := test.CreateAndInsertNodes(l.cloud, nodeNames, vals.ZoneName) + if err != nil { + t.Errorf("Unexpected error when adding nodes %v", err) + } + // This function simulates the error where backend service protocol cannot be changed + // before deleting the forwarding rule. + c.MockRegionBackendServices.UpdateHook = func(ctx context.Context, key *meta.Key, be *compute.BackendService, m *cloud.MockRegionBackendServices) error { + // Check FRnames with both protocols to make sure there is no leak or incorrect update. + frNames := []string{l.getFRNameWithProtocol("TCP"), l.getFRNameWithProtocol("UDP")} + for _, name := range frNames { + key, err := composite.CreateKey(l.cloud, name, meta.Regional) + if err != nil { + return fmt.Errorf("unexpected error when creating key - %v", err) + } + fr, err := c.MockForwardingRules.Get(ctx, key) + if utils.IgnoreHTTPNotFound(err) != nil { + return err + } + if fr != nil && fr.IPProtocol != be.Protocol { + return fmt.Errorf("protocol mismatch between Forwarding Rule value %q and Backend service value %q", fr.IPProtocol, be.Protocol) + } + + } + return mock.UpdateRegionBackendServiceHook(ctx, key, be, m) + } + + frName := l.getFRNameWithProtocol("TCP") + status, err := l.EnsureInternalLoadBalancer(nodeNames, svc) + if err != nil { + t.Errorf("Failed to ensure loadBalancer, err %v", err) + } + if len(status.Ingress) == 0 { + t.Errorf("Got empty loadBalancer status using handler %v", l) + } + key, err := composite.CreateKey(l.cloud, frName, meta.Regional) + if err != nil { + t.Errorf("Unexpected error when creating key - %v", err) + } + fwdRule, err := composite.GetForwardingRule(l.cloud, key, meta.VersionGA) + if err != nil { + t.Errorf("Unexpected error when looking up forwarding rule - %v", err) + } + if fwdRule.IPProtocol != "TCP" { + t.Errorf("Unexpected protocol value %s, expected TCP", fwdRule.IPProtocol) + } + // change the protocol to UDP + svc.Spec.Ports[0].Protocol = v1.ProtocolUDP + status, err = l.EnsureInternalLoadBalancer(nodeNames, svc) + if err != nil { + t.Errorf("Failed to ensure loadBalancer, err %v", err) + } + if len(status.Ingress) == 0 { + t.Errorf("Got empty loadBalancer status using handler %v", l) + } + // Make sure the old forwarding rule is deleted + fwdRule, err = composite.GetForwardingRule(l.cloud, key, meta.VersionGA) + if !utils.IsNotFoundError(err) { + t.Errorf("Failed to delete ForwardingRule %s", frName) + } + frName = l.getFRNameWithProtocol("UDP") + if key, err = composite.CreateKey(l.cloud, frName, meta.Regional); err != nil { + t.Errorf("Unexpected error when creating key - %v", err) + } + if fwdRule, err = composite.GetForwardingRule(l.cloud, key, meta.VersionGA); err != nil { + t.Errorf("Unexpected error when looking up forwarding rule - %v", err) + } + if fwdRule.IPProtocol != "UDP" { + t.Errorf("Unexpected protocol value %s, expected UDP", fwdRule.IPProtocol) + } + + // Delete the service + err = l.EnsureInternalLoadBalancerDeleted(svc) + if err != nil { + t.Errorf("Unexpected error %v", err) + } + assertInternalLbResourcesDeleted(t, svc, true, l) +} + func assertInternalLbResources(t *testing.T, apiService *v1.Service, l *L4, nodeNames []string) { // Check that Firewalls are created for the LoadBalancer and the HealthCheck sharedHC := !servicehelper.RequestsOnlyLocalTraffic(apiService)