Skip to content

Commit

Permalink
Addressed review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
prameshj committed Dec 31, 2019
1 parent a5e3006 commit 81b9f11
Show file tree
Hide file tree
Showing 12 changed files with 85 additions and 54 deletions.
3 changes: 1 addition & 2 deletions pkg/annotations/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,7 @@ func FromService(obj *v1.Service) *Service {

// ILBAnnotation returns true if L4 ILB annotation is found.
func ILBAnnotation(svc *v1.Service) bool {
lbType, _ := gce.GetLoadBalancerAnnotationType(svc)
return lbType == gce.LBTypeInternal
return gce.GetLoadBalancerAnnotationType(svc) == gce.LBTypeInternal
}

// ApplicationProtocols returns a map of port (name or number) to the protocol
Expand Down
7 changes: 2 additions & 5 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,11 +500,8 @@ func (c *Controller) mergeVmPrimaryIpNEGsPortInfo(service *apiv1.Service, name t
}
svcPortSet := make(negtypes.SvcPortTupleSet)
svcPortSet.Insert(
negtypes.SvcPortTuple{
Port: 0,
Name: string(negtypes.VmPrimaryIpEndpointType),
TargetPort: "",
},
// Insert Empty PortTuple for VmPrimaryIp NEGs.
negtypes.SvcPortTuple{},
)
return portInfoMap.Merge(negtypes.NewPortInfoMap(name.Namespace, name.Name, svcPortSet, c.namer, false, !helpers.RequestsOnlyLocalTraffic(service)))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/neg/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1105,7 +1105,7 @@ func newTestILBPortInfoMap(ns, name string, randomize bool, namer negtypes.Netwo
svcPortSet.Insert(
negtypes.SvcPortTuple{
Port: 0,
Name: string(negtypes.VmPrimaryIpEndpointType),
Name: "",
TargetPort: "",
},
)
Expand Down
8 changes: 1 addition & 7 deletions pkg/neg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,6 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg
errList = append(errList, err)
}
}
if syncerKey.NegType == negtypes.VmPrimaryIpEndpointType {
// start sync even if there are no endpoints
if !syncer.Sync() {
klog.Errorf("Failed to Sync after startup for VM_IP NEG %s", syncerKey.String())
}
}
}
return utilerrors.NewAggregate(errList)
}
Expand Down Expand Up @@ -338,7 +332,7 @@ func getSyncerKey(namespace, name string, servicePortKey negtypes.PortInfoMapKey
if flags.F.EnableNonGCPMode {
networkEndpointType = negtypes.NonGCPPrivateEndpointType
}
if portInfo.PortTuple.Port == 0 && portInfo.PortTuple.Name == string(negtypes.VmPrimaryIpEndpointType) {
if portInfo.PortTuple.Empty() {
networkEndpointType = negtypes.VmPrimaryIpEndpointType
}

Expand Down
47 changes: 40 additions & 7 deletions pkg/neg/syncers/subsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,38 +38,60 @@ const (

// NodeInfo stores node metadata used to sort nodes and pick a subset.
type NodeInfo struct {
index int
// index stores the index of the given node in the input node list. This is useful to
// identify the node in the list after sorting.
index int
// hashedName is the sha256 hash of the given node name along with a salt.
hashedName string
skip bool
// skip indicates if this node has already been selected in the subset and hence needs
// to be skipped.
skip bool
}

func getHashedName(nodeName, salt string) string {
hashSum := sha256.Sum256([]byte(nodeName + ":" + salt))
return hex.EncodeToString(hashSum[:])
}

// PickSubsetsNoRemovals ensures that there are no node removals from current subset unless the node no longer exists.
// pickSubsetsMinRemovals ensures that there are no node removals from current subset unless the node no longer exists
// or the subset size has reduced. Subset size can reduce if a new zone got added in the cluster and the per-zone limit
// now reduces.
// This function takes a list of nodes, hash salt, count, current set and returns a subset of size - 'count'.
// If the input list is smaller than the desired subset count, the entire list is returned. The hash salt
// is used so that a different subset is returned even when the same node list is passed in, for a different salt value.
// It also keeps the subset relatively stable for the same service.
func PickSubsetsNoRemovals(nodes []*v1.Node, salt string, count int, current []negtypes.NetworkEndpoint) []*v1.Node {
// Example 1 - Recalculate subset, subset size increase.
// nodes = [node1 node2 node3 node4 node5], Current subset - [node3, node2, node5], count 4
// sorted list is [node3 node2 node5 node4 node1]
// Output [node3, node2, node5, node4] - No removals in existing subset.
// ---------------------------------------------------------------------------------------------------------
// Example 2 - Recalculate subset, new node got added.
// nodes = [node1 node2 node3 node4 node5, node6], Current subset - [node3, node2, node5, node4], count 4
// sorted list is [node3 node6 node2 node5 node4 node1]
// Output [node3, node2, node5, node4] - No removals in existing subset even though node6 shows up at a lower index
// in the sorted list.
// ---------------------------------------------------------------------------------------------------------
// Example 2 - Recalculate subset, node3 got removed.
// nodes = [node1 node2 node4 node5, node6], Current subset - [node3, node2, node5, node4], count 4
// sorted list is [node6 node2 node5 node4 node1]
// Output [node2, node5, node4 node6]
func pickSubsetsMinRemovals(nodes []*v1.Node, salt string, count int, current []negtypes.NetworkEndpoint) []*v1.Node {
if len(nodes) < count {
return nodes
}
subset := make([]*v1.Node, 0, count)
info := make([]*NodeInfo, len(nodes))
// Generate hashed names for all cluster nodes and sort them alphabetically, based on the hashed string.
for i, node := range nodes {
info[i] = &NodeInfo{i, getHashedName(node.Name, salt), false}
}
// sort alphabetically, based on the hashed string
sort.Slice(info, func(i, j int) bool {
return info[i].hashedName < info[j].hashedName
})
// Pick all nodes from existing subset if still available.
for _, ep := range current {
curHashName := getHashedName(ep.Node, salt)
for _, nodeInfo := range info {
curHashName := getHashedName(ep.Node, salt)
if nodeInfo.hashedName == curHashName {
subset = append(subset, nodes[nodeInfo.index])
nodeInfo.skip = true
Expand All @@ -79,11 +101,13 @@ func PickSubsetsNoRemovals(nodes []*v1.Node, salt string, count int, current []n
}
}
if len(subset) >= count {
// trim the subset to the given subset size, remove extra nodes.
subset = subset[:count]
return subset
}
for _, val := range info {
if val.skip {
// This node was already picked as it is part of the current subset.
continue
}
subset = append(subset, nodes[val.index])
Expand All @@ -94,6 +118,9 @@ func PickSubsetsNoRemovals(nodes []*v1.Node, salt string, count int, current []n
return subset
}

// getSubsetPerZone groups the given list of nodes by zone, ensuring that there is a
// non-zero subset from each zone.
// The output is a map of zone string to NEG subset.
func getSubsetPerZone(nodes []*v1.Node, zoneGetter negtypes.ZoneGetter, svcID string, currentMap map[string]negtypes.NetworkEndpointSet, newEpCount int, randomize bool) (map[string]negtypes.NetworkEndpointSet, error) {
result := make(map[string]negtypes.NetworkEndpointSet)
zoneMap := make(map[string][]*v1.Node)
Expand All @@ -113,6 +140,7 @@ func getSubsetPerZone(nodes []*v1.Node, zoneGetter negtypes.ZoneGetter, svcID st
for _, set := range currentMap {
currentEpCount += set.Len()
}
// subsetSize is equal to numZones if there are no endpoints, so one node from each zone is selected.
subsetSize := getSubsetCount(currentEpCount, newEpCount, numZones, randomize)
// This algorithm picks atmost 'perZoneSubset' number of nodes from each zone.
// If there are fewer nodes in one zone, more nodes are NOT picked from other zones.
Expand All @@ -129,14 +157,19 @@ func getSubsetPerZone(nodes []*v1.Node, zoneGetter negtypes.ZoneGetter, svcID st
currentList = nil
}
}
subset := PickSubsetsNoRemovals(nodesInZone, svcID, perZoneSubset, currentList)
subset := pickSubsetsMinRemovals(nodesInZone, svcID, perZoneSubset, currentList)
for _, node := range subset {
result[zone].Insert(negtypes.NetworkEndpoint{Node: node.Name, IP: utils.GetNodePrimaryIP(node)})
}
}
return result, nil
}

// getSubsetCount computes the size of the subset based on input parameters.
// If there are no endpoints, the subset count is same as the number of cluster zones.
// In the non-random mode, the subset size is equal to the endpoint count, bound by a limit.
// In the random mode, the subset is equal to the current or new endpoint count, whichever is larger. This is also
// bound by a limit.
func getSubsetCount(currentCount, newCount, numZones int, randomize bool) int {
if newCount == 0 {
// no endpoints for this service, use the zone count and pick one node per zone.
Expand Down
16 changes: 8 additions & 8 deletions pkg/neg/syncers/subsets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ func TestBasicSubset(t *testing.T) {
{ObjectMeta: metav1.ObjectMeta{Name: "node25"}},
}
count := 3
subset1 := PickSubsetsNoRemovals(nodes, "svc123", count, nil)
subset1 := pickSubsetsMinRemovals(nodes, "svc123", count, nil)
if len(subset1) < 3 {
t.Errorf("Expected %d subsets, got only %d - %v", count, len(subset1), subset1)
}
if !validateSubset(subset1, nodes) {
t.Errorf("Invalid subset list %v from %v", subset1, nodes)
}
subset2 := PickSubsetsNoRemovals(nodes, "svc345", count, nil)
subset3 := PickSubsetsNoRemovals(nodes, "svc56", count, nil)
subset2 := pickSubsetsMinRemovals(nodes, "svc345", count, nil)
subset3 := pickSubsetsMinRemovals(nodes, "svc56", count, nil)
t.Logf("Subset2 is %s", nodeNames(subset2))
t.Logf("Subset3 is %s", nodeNames(subset3))
if isIdentical(subset1, subset2) || isIdentical(subset3, subset2) || isIdentical(subset1, subset3) {
Expand All @@ -54,7 +54,7 @@ func TestBasicSubset(t *testing.T) {
func TestEmptyNodes(t *testing.T) {
t.Parallel()
count := 3
subset1 := PickSubsetsNoRemovals(nil, "svc123", count, nil)
subset1 := pickSubsetsMinRemovals(nil, "svc123", count, nil)
if len(subset1) != 0 {
t.Errorf("Expected empty subset, got - %s", nodeNames(subset1))
}
Expand All @@ -71,7 +71,7 @@ func TestFewerNodes(t *testing.T) {
{ObjectMeta: metav1.ObjectMeta{Name: "node25"}},
}
count := 10
subset1 := PickSubsetsNoRemovals(nodes, "svc123", count, nil)
subset1 := pickSubsetsMinRemovals(nodes, "svc123", count, nil)
if len(subset1) != len(nodes) {
t.Errorf("Expected subset of length %d, got %d, subsets - %s", len(nodes), len(subset1), nodeNames(subset1))
}
Expand All @@ -90,14 +90,14 @@ func TestNoRemovals(t *testing.T) {
{ObjectMeta: metav1.ObjectMeta{Name: "node25"}},
}
count := 5
subset1 := PickSubsetsNoRemovals(nodes, "svc123", count, nil)
subset1 := pickSubsetsMinRemovals(nodes, "svc123", count, nil)
if len(subset1) < 5 {
t.Errorf("Expected %d subsets, got only %d - %v", count, len(subset1), subset1)
}
// nodeName abcd shows up 2nd in the sorted list for the given salt. So picking a subset of 5 will remove one of the
// existing nodes.
nodes = append(nodes, &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node:abcd"}})
subset2 := PickSubsetsNoRemovals(nodes, "svc123", count, nil)
subset2 := pickSubsetsMinRemovals(nodes, "svc123", count, nil)
if len(subset2) < 5 {
t.Errorf("Expected %d subsets, got only %d - %v", count, len(subset2), subset2)
}
Expand All @@ -108,7 +108,7 @@ func TestNoRemovals(t *testing.T) {
for _, node := range subset1 {
existingEp = append(existingEp, types.NetworkEndpoint{Node: node.Name})
}
subset3 := PickSubsetsNoRemovals(nodes, "svc123", count, existingEp)
subset3 := pickSubsetsMinRemovals(nodes, "svc123", count, existingEp)
if len(subset3) < 5 {
t.Errorf("Expected %d subsets, got only %d - %v", count, len(subset3), subset3)
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ type transactionSyncer struct {
cloud negtypes.NetworkEndpointGroupCloud
zoneGetter negtypes.ZoneGetter

// This only applies in the GCE_VM_PRIMARY_IP NEG.
// randomize indicates that the endpoints of the NEG can be picked at random, rather
// than following the endpoints of the service. This only applies in the GCE_VM_PRIMARY_IP NEG
// than following the endpoints of the service.
randomize bool

// retry handles back off retry for NEG API operations
Expand Down Expand Up @@ -164,7 +165,7 @@ func (s *transactionSyncer) syncInternal() error {
addEndpoints, removeEndpoints := calculateNetworkEndpointDifference(targetMap, currentMap)
if s.randomize && len(removeEndpoints) > 0 {
// Make removals minimum since the traffic will be abruptly stopped. Log removals
klog.Infof("Removing endpoints %+v from GCE_VM_PRIMARY_IP NEG %s", removeEndpoints, s.negName)
klog.V(3).Infof("Removing endpoints %+v from GCE_VM_PRIMARY_IP NEG %s", removeEndpoints, s.negName)
}
// Calculate Pods that are already in the NEG
_, committedEndpoints := calculateNetworkEndpointDifference(addEndpoints, targetMap)
Expand All @@ -177,9 +178,8 @@ func (s *transactionSyncer) syncInternal() error {
// filter out the endpoints that are in transaction
filterEndpointByTransaction(committedEndpoints, s.transactions)

if s.NegSyncerKey.NegType != negtypes.VmPrimaryIpEndpointType {
s.commitPods(committedEndpoints, endpointPodMap)
}
// no-op in case of VmPrimaryIp NEGs.
s.commitPods(committedEndpoints, endpointPodMap)

if len(addEndpoints) == 0 && len(removeEndpoints) == 0 {
klog.V(4).Infof("No endpoint change for %s/%s, skip syncing NEG. ", s.Namespace, s.Name)
Expand Down
10 changes: 8 additions & 2 deletions pkg/neg/syncers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,16 +170,22 @@ func ensureNetworkEndpointGroup(svcNamespace, svcName, negName, zone, negService
return nil
}

// toZonePrimaryIPEndpointMap computes updated endpojnts for GCE_VM_PRIMARY_IP NEG.
// It takes list of service endpoints, current NEG endpojnts and computes the updated NEG endpoints.
// In the non-random mode, nodes running the service endpoints are selected as the subset. In the random mode, the node
// names are hashed with a salt and sorted in order to identify the subset. Removals are kept to a minimum in this mode.
// This is to reduce the disruption to existing ILB connections, when nodes are added to the cluster causing a change in
// the subset.
func toZonePrimaryIPEndpointMap(endpoints *apiv1.Endpoints, nodeLister listers.NodeLister, zoneGetter negtypes.ZoneGetter, randomize bool, currentMap map[string]negtypes.NetworkEndpointSet, serviceKey string) (map[string]negtypes.NetworkEndpointSet, error) {
if randomize {
// Pick any nodes as subset
// In this mode, any of the cluster nodes can be used to pick the subset
nodes, _ := nodeLister.ListWithPredicate(utils.GetNodeConditionPredicate())
return getSubsetPerZone(nodes, zoneGetter, serviceKey, currentMap, utils.NumEndpoints(endpoints), true)
}
// pick the nodes where the service endpoints are located.
nodes := []*v1.Node{}
nodeNames := sets.String{}
epCount := 0
// Follow endpoints and pick a subset
for _, ep := range endpoints.Subsets {
for _, addr := range ep.Addresses {
if addr.NodeName == nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/neg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ type SvcPortTuple struct {
TargetPort string
}

func (t SvcPortTuple) Empty() bool {
return t.Port == 0 && t.Name == "" && t.TargetPort == ""
}

// String returns the string representation of SvcPortTuple
func (t SvcPortTuple) String() string {
return fmt.Sprintf("%s/%v-%s", t.Name, t.Port, t.TargetPort)
Expand Down
5 changes: 5 additions & 0 deletions pkg/utils/common/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ const (
// FinalizerKeyV2 is the string representing the Ingress finalizer version.
// Ingress with V2 finalizer uses V2 frontend naming scheme.
FinalizerKeyV2 = "networking.gke.io/ingress-finalizer-V2"
// FinalizerKeyL4 is the string representing the L4 ILB controller finalizer in this repo.
FinalizerKeyL4 = "networking.gke.io/l4-ilb-v2"
// FinalizerKeyL4V1 is the string representing the service controller finalizer. A service with this finalizer
// is managed by k/k service controller.
FinalizerKeyL4V1 = "networking.gke.io/l4-ilb-v1"
)

// IsDeletionCandidate is true if the passed in meta contains an ingress finalizer.
Expand Down
13 changes: 6 additions & 7 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,12 +432,11 @@ func NumEndpoints(ep *api_v1.Endpoints) (result int) {

// IsLegacyL4ILBService returns true if the given LoadBalancer service is managed by service controller.
func IsLegacyL4ILBService(g *gce.Cloud, svc *api_v1.Service) bool {
// ctx and clusterName parameters are unused.
loadBalancerName := g.GetLoadBalancerName(nil, "", svc)
_, err := g.GetRegionForwardingRule(loadBalancerName, g.Region())
if err != nil && IsNotFoundError(err) {
// legacy forwarding rule not found.
return false
for _, key := range svc.ObjectMeta.Finalizers {
if key == common.FinalizerKeyL4V1 {
// service has v1 finalizer, this is handled by service controller code.
return true
}
}
return true
return false
}
14 changes: 4 additions & 10 deletions pkg/utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import (
"testing"
"time"

compute "google.golang.org/api/compute/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/ingress-gce/pkg/annotations"
"k8s.io/ingress-gce/pkg/flags"
"k8s.io/ingress-gce/pkg/utils/common"
Expand Down Expand Up @@ -763,6 +762,7 @@ func TestIsLegacyL4ILBService(t *testing.T) {
Name: "testsvc",
Namespace: "default",
Annotations: map[string]string{gce.ServiceAnnotationLoadBalancerType: string(gce.LBTypeInternal)},
Finalizers: []string{common.FinalizerKeyL4V1},
},
Spec: api_v1.ServiceSpec{
Type: api_v1.ServiceTypeLoadBalancer,
Expand All @@ -772,18 +772,12 @@ func TestIsLegacyL4ILBService(t *testing.T) {
},
}
g := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
lbName := g.GetLoadBalancerName(nil, "", svc)
err := g.CreateRegionForwardingRule(&compute.ForwardingRule{Name: lbName}, g.Region())
if err != nil {
t.Errorf("Failed to create test forwarding rule - err %v", err)
}
if !IsLegacyL4ILBService(g, svc) {
t.Errorf("Expected True for Legacy service %s, got False", svc.Name)
}

// Change the service name and UID and ensure the check returns False.
svc.Name = "test123"
svc.UID = "test123-uid"
// Remove the finalizer and ensure the check returns False.
svc.ObjectMeta.Finalizers = nil
if IsLegacyL4ILBService(g, svc) {
t.Errorf("Expected False for Legacy service %s, got True", svc.Name)
}
Expand Down

0 comments on commit 81b9f11

Please sign in to comment.