Skip to content

Commit

Permalink
Merge pull request #920 from cxhiano/neg_controller_non_gcp_mode
Browse files Browse the repository at this point in the history
Add a flag for Non-GCP mode NEG controller.
  • Loading branch information
k8s-ci-robot committed Oct 29, 2019
2 parents 1707686 + a30f847 commit 212e82a
Show file tree
Hide file tree
Showing 8 changed files with 188 additions and 46 deletions.
2 changes: 2 additions & 0 deletions pkg/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ var (
EnableL7Ilb bool
EnableCSM bool
CSMServiceNEGSkipNamespaces []string
EnableNonGCPMode bool

LeaderElection LeaderElectionConfiguration
}{}
Expand Down Expand Up @@ -202,6 +203,7 @@ L7 load balancing. CSV values accepted. Example: -node-port-ranges=80,8080,400-5
`Optional, whether or not to enable L7-ILB.`)
flag.BoolVar(&F.EnableCSM, "enable-csm", false, "Enable CSM(Istio) support")
flag.StringSliceVar(&F.CSMServiceNEGSkipNamespaces, "csm-service-skip-namespaces", []string{}, "Only for CSM mode, skip the NEG creation for Services in the given namespaces.")
flag.BoolVar(&F.EnableNonGCPMode, "enable-non-gcp-mode", false, "Set to true when running on a non-GCP cluster.")
}

type RateLimitSpecs struct {
Expand Down
7 changes: 7 additions & 0 deletions pkg/neg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/ingress-gce/pkg/flags"
"k8s.io/ingress-gce/pkg/neg/readiness"
negsyncer "k8s.io/ingress-gce/pkg/neg/syncers"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
Expand Down Expand Up @@ -113,9 +114,15 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg
syncerKey := getSyncerKey(namespace, name, svcPort, portInfo)
syncer, ok := manager.syncerMap[syncerKey]
if !ok {
networkEndpointType := negtypes.VMNetworkEndpointType
if flags.F.EnableNonGCPMode {
networkEndpointType = negtypes.NonGCPPrivateEndpointType
}

syncer = negsyncer.NewTransactionSyncer(
syncerKey,
portInfo.NegName,
networkEndpointType,
manager.recorder,
manager.cloud,
manager.zoneGetter,
Expand Down
27 changes: 15 additions & 12 deletions pkg/neg/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,19 +314,22 @@ func TestGarbageCollectionNEG(t *testing.T) {
t.Fatalf("Failed to ensure syncer: %v", err)
}

negName := manager.namer.NEG("test", "test", 80)
manager.cloud.CreateNetworkEndpointGroup(&compute.NetworkEndpointGroup{
Name: negName,
}, negtypes.TestZone1)

if err := manager.GC(); err != nil {
t.Fatalf("Failed to GC: %v", err)
}
for _, networkEndpointType := range []negtypes.NetworkEndpointType{negtypes.VMNetworkEndpointType, negtypes.NonGCPPrivateEndpointType} {
negName := manager.namer.NEG("test", "test", 80)
manager.cloud.CreateNetworkEndpointGroup(&compute.NetworkEndpointGroup{
Name: negName,
NetworkEndpointType: string(networkEndpointType),
}, negtypes.TestZone1)

if err := manager.GC(); err != nil {
t.Fatalf("Failed to GC: %v", err)
}

negs, _ := manager.cloud.ListNetworkEndpointGroup(negtypes.TestZone1)
for _, neg := range negs {
if neg.Name == negName {
t.Errorf("Expect NEG %q to be GCed.", negName)
negs, _ := manager.cloud.ListNetworkEndpointGroup(negtypes.TestZone1)
for _, neg := range negs {
if neg.Name == negName {
t.Errorf("Expect NEG %q to be GCed.", negName)
}
}
}

Expand Down
31 changes: 17 additions & 14 deletions pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type transactionSyncer struct {
// metadata
negtypes.NegSyncerKey
negName string
// The type of the network endpoints in this NEG.
networkEndpointType negtypes.NetworkEndpointType

// syncer provides syncer life cycle interfaces
syncer negtypes.NegSyncer
Expand Down Expand Up @@ -65,20 +67,21 @@ type transactionSyncer struct {
reflector readiness.Reflector
}

func NewTransactionSyncer(negSyncerKey negtypes.NegSyncerKey, networkEndpointGroupName string, recorder record.EventRecorder, cloud negtypes.NetworkEndpointGroupCloud, zoneGetter negtypes.ZoneGetter, podLister cache.Indexer, serviceLister cache.Indexer, endpointLister cache.Indexer, reflector readiness.Reflector) negtypes.NegSyncer {
func NewTransactionSyncer(negSyncerKey negtypes.NegSyncerKey, networkEndpointGroupName string, networkEndpointType negtypes.NetworkEndpointType, recorder record.EventRecorder, cloud negtypes.NetworkEndpointGroupCloud, zoneGetter negtypes.ZoneGetter, podLister cache.Indexer, serviceLister cache.Indexer, endpointLister cache.Indexer, reflector readiness.Reflector) negtypes.NegSyncer {
// TransactionSyncer implements the syncer core
ts := &transactionSyncer{
NegSyncerKey: negSyncerKey,
negName: networkEndpointGroupName,
needInit: true,
transactions: NewTransactionTable(),
podLister: podLister,
serviceLister: serviceLister,
endpointLister: endpointLister,
recorder: recorder,
cloud: cloud,
zoneGetter: zoneGetter,
reflector: reflector,
NegSyncerKey: negSyncerKey,
negName: networkEndpointGroupName,
networkEndpointType: networkEndpointType,
needInit: true,
transactions: NewTransactionTable(),
podLister: podLister,
serviceLister: serviceLister,
endpointLister: endpointLister,
recorder: recorder,
cloud: cloud,
zoneGetter: zoneGetter,
reflector: reflector,
}
// Syncer implements life cycle logic
syncer := newSyncer(negSyncerKey, networkEndpointGroupName, serviceLister, recorder, ts)
Expand Down Expand Up @@ -130,7 +133,7 @@ func (s *transactionSyncer) syncInternal() error {
return nil
}

targetMap, endpointPodMap, err := toZoneNetworkEndpointMap(ep.(*apiv1.Endpoints), s.zoneGetter, s.TargetPort, s.podLister, s.NegSyncerKey.SubsetLabels)
targetMap, endpointPodMap, err := toZoneNetworkEndpointMap(ep.(*apiv1.Endpoints), s.zoneGetter, s.TargetPort, s.podLister, s.NegSyncerKey.SubsetLabels, s.networkEndpointType)
if err != nil {
return err
}
Expand Down Expand Up @@ -178,7 +181,7 @@ func (s *transactionSyncer) ensureNetworkEndpointGroups() error {

var errList []error
for _, zone := range zones {
if err := ensureNetworkEndpointGroup(s.Namespace, s.Name, s.negName, zone, s.NegSyncerKey.String(), s.cloud, s.serviceLister, s.recorder); err != nil {
if err := ensureNetworkEndpointGroup(s.Namespace, s.Name, s.negName, zone, s.NegSyncerKey.String(), s.networkEndpointType, s.cloud, s.serviceLister, s.recorder); err != nil {
errList = append(errList, err)
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/neg/syncers/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,7 @@ func newTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud) (negty

negsyncer := NewTransactionSyncer(svcPort,
testNegName,
negtypes.VMNetworkEndpointType,
record.NewFakeRecorder(100),
fakeGCE,
negtypes.NewFakeZoneGetter(),
Expand Down
35 changes: 24 additions & 11 deletions pkg/neg/syncers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,10 @@ const (
MAX_NETWORK_ENDPOINTS_PER_BATCH = 500
// For each NEG, only retries 15 times to process it.
// This is a convention in kube-controller-manager.
maxRetries = 15
minRetryDelay = 5 * time.Second
maxRetryDelay = 600 * time.Second
separator = "||"
negIPPortNetworkEndpointType = "GCE_VM_IP_PORT"
maxRetries = 15
minRetryDelay = 5 * time.Second
maxRetryDelay = 600 * time.Second
separator = "||"
)

// encodeEndpoint encodes ip and instance into a single string
Expand Down Expand Up @@ -113,7 +112,7 @@ func getService(serviceLister cache.Indexer, namespace, name string) *apiv1.Serv
}

// ensureNetworkEndpointGroup ensures corresponding NEG is configured correctly in the specified zone.
func ensureNetworkEndpointGroup(svcNamespace, svcName, negName, zone, negServicePortName string, cloud negtypes.NetworkEndpointGroupCloud, serviceLister cache.Indexer, recorder record.EventRecorder) error {
func ensureNetworkEndpointGroup(svcNamespace, svcName, negName, zone, negServicePortName string, networkEndpointType negtypes.NetworkEndpointType, cloud negtypes.NetworkEndpointGroupCloud, serviceLister cache.Indexer, recorder record.EventRecorder) error {
neg, err := cloud.GetNetworkEndpointGroup(negName, zone)
if err != nil {
// Most likely to be caused by non-existed NEG
Expand All @@ -123,8 +122,11 @@ func ensureNetworkEndpointGroup(svcNamespace, svcName, negName, zone, negService
needToCreate := false
if neg == nil {
needToCreate = true
} else if !utils.EqualResourceIDs(neg.Network, cloud.NetworkURL()) ||
!utils.EqualResourceIDs(neg.Subnetwork, cloud.SubnetworkURL()) {
} else if networkEndpointType != negtypes.NonGCPPrivateEndpointType &&
// Only perform the following checks when the NEGs are not Non-GCP NEGs.
// Non-GCP NEGs do not have associated network and subnetwork.
(!utils.EqualResourceIDs(neg.Network, cloud.NetworkURL()) ||
!utils.EqualResourceIDs(neg.Subnetwork, cloud.SubnetworkURL())) {
needToCreate = true
klog.V(2).Infof("NEG %q in %q does not match network and subnetwork of the cluster. Deleting NEG.", negName, zone)
err = cloud.DeleteNetworkEndpointGroup(negName, zone)
Expand All @@ -141,11 +143,18 @@ func ensureNetworkEndpointGroup(svcNamespace, svcName, negName, zone, negService

if needToCreate {
klog.V(2).Infof("Creating NEG %q for %s in %q.", negName, negServicePortName, zone)
var subnetwork string
switch networkEndpointType {
case negtypes.NonGCPPrivateEndpointType:
subnetwork = ""
default:
subnetwork = cloud.SubnetworkURL()
}
err = cloud.CreateNetworkEndpointGroup(&compute.NetworkEndpointGroup{
Name: negName,
NetworkEndpointType: negIPPortNetworkEndpointType,
NetworkEndpointType: string(networkEndpointType),
Network: cloud.NetworkURL(),
Subnetwork: cloud.SubnetworkURL(),
Subnetwork: subnetwork,
}, zone)
if err != nil {
return err
Expand All @@ -161,7 +170,7 @@ func ensureNetworkEndpointGroup(svcNamespace, svcName, negName, zone, negService
}

// toZoneNetworkEndpointMap translates addresses in endpoints object and Istio:DestinationRule subset into zone and endpoints map
func toZoneNetworkEndpointMap(endpoints *apiv1.Endpoints, zoneGetter negtypes.ZoneGetter, targetPort string, podLister cache.Indexer, subsetLables string) (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap, error) {
func toZoneNetworkEndpointMap(endpoints *apiv1.Endpoints, zoneGetter negtypes.ZoneGetter, targetPort string, podLister cache.Indexer, subsetLables string, networkEndpointType negtypes.NetworkEndpointType) (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap, error) {
zoneNetworkEndpointMap := map[string]negtypes.NetworkEndpointSet{}
networkEndpointPodMap := negtypes.EndpointPodMap{}
if endpoints == nil {
Expand Down Expand Up @@ -227,6 +236,10 @@ func toZoneNetworkEndpointMap(endpoints *apiv1.Endpoints, zoneGetter negtypes.Zo

if includeAllEndpoints || shouldPodBeInNeg(podLister, address.TargetRef.Namespace, address.TargetRef.Name) {
networkEndpoint := negtypes.NetworkEndpoint{IP: address.IP, Port: matchPort, Node: *address.NodeName}
if networkEndpointType == negtypes.NonGCPPrivateEndpointType {
// Non-GCP network endpoints don't have associated nodes.
networkEndpoint.Node = ""
}
zoneNetworkEndpointMap[zone].Insert(networkEndpoint)
networkEndpointPodMap[networkEndpoint] = types.NamespacedName{Namespace: address.TargetRef.Namespace, Name: address.TargetRef.Name}
}
Expand Down
124 changes: 115 additions & 9 deletions pkg/neg/syncers/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (

"fmt"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
"google.golang.org/api/compute/v1"
apiv1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -293,6 +295,85 @@ func TestNetworkEndpointCalculateDifference(t *testing.T) {
}
}

func TestEnsureNetworkEndpointGroup(t *testing.T) {
var (
testZone = "test-zone"
testNamedPort = "named-port"
testServiceName = "test-svc"
testServiceNameSpace = "test-ns"
testNetwork = cloud.ResourcePath("network", &meta.Key{Zone: testZone, Name: "test-network"})
testSubnetwork = cloud.ResourcePath("subnetwork", &meta.Key{Zone: testZone, Name: "test-subnetwork"})
)

fakeCloud := negtypes.NewFakeNetworkEndpointGroupCloud(testSubnetwork, testNetwork)

testCases := []struct {
description string
negName string
enableNonGCPMode bool
networkEndpointType negtypes.NetworkEndpointType
expectedSubnetwork string
}{
{
description: "Create NEG of type GCE_VM_IP_PORT",
negName: "gcp-neg",
enableNonGCPMode: false,
networkEndpointType: negtypes.VMNetworkEndpointType,
expectedSubnetwork: testSubnetwork,
},
{
description: "Create NEG of type NON_GCP_PRIVATE_IP_PORT",
negName: "non-gcp-neg",
enableNonGCPMode: true,
networkEndpointType: negtypes.NonGCPPrivateEndpointType,
expectedSubnetwork: "",
},
}
for _, tc := range testCases {
ensureNetworkEndpointGroup(
testServiceNameSpace,
testServiceName,
tc.negName,
testZone,
testNamedPort,
tc.networkEndpointType,
fakeCloud,
nil,
nil,
)

neg, err := fakeCloud.GetNetworkEndpointGroup(tc.negName, testZone)
if err != nil {
t.Errorf("Failed to retrieve NEG %q: %v", tc.negName, err)
}

if neg.NetworkEndpointType != string(tc.networkEndpointType) {
t.Errorf("Unexpected NetworkEndpointType, expecting %q but got %q", tc.networkEndpointType, neg.NetworkEndpointType)
}

if neg.Subnetwork != tc.expectedSubnetwork {
t.Errorf("Unexpected Subnetwork, expecting %q but got %q", tc.expectedSubnetwork, neg.Subnetwork)
}

// Call ensureNetworkEndpointGroup with the same NEG.
err = ensureNetworkEndpointGroup(
testServiceNameSpace,
testServiceName,
tc.negName,
testZone,
testNamedPort,
tc.networkEndpointType,
fakeCloud,
nil,
nil,
)

if err != nil {
t.Errorf("Unexpected error when called with duplicated NEG: %v", err)
}
}
}

func TestToZoneNetworkEndpointMapUtil(t *testing.T) {
t.Parallel()
_, transactionSyncer := newTestTransactionSyncer(negtypes.NewAdapter(gce.NewFakeGCECloud(gce.DefaultTestClusterValues())))
Expand Down Expand Up @@ -326,16 +407,18 @@ func TestToZoneNetworkEndpointMapUtil(t *testing.T) {

zoneGetter := negtypes.NewFakeZoneGetter()
testCases := []struct {
desc string
targetPort string
endpointSets map[string]negtypes.NetworkEndpointSet
expectMap negtypes.EndpointPodMap
desc string
targetPort string
endpointSets map[string]negtypes.NetworkEndpointSet
expectMap negtypes.EndpointPodMap
networkEndpointType negtypes.NetworkEndpointType
}{
{
desc: "non exist target port",
targetPort: "8888",
endpointSets: map[string]negtypes.NetworkEndpointSet{},
expectMap: negtypes.EndpointPodMap{},
desc: "non exist target port",
targetPort: "8888",
endpointSets: map[string]negtypes.NetworkEndpointSet{},
expectMap: negtypes.EndpointPodMap{},
networkEndpointType: negtypes.VMNetworkEndpointType,
},
{
desc: "target port number",
Expand All @@ -356,6 +439,7 @@ func TestToZoneNetworkEndpointMapUtil(t *testing.T) {
networkEndpointFromEncodedEndpoint("10.100.3.1||instance3||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod4"},
networkEndpointFromEncodedEndpoint("10.100.1.3||instance1||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod5"},
},
networkEndpointType: negtypes.VMNetworkEndpointType,
},
{
desc: "named target port",
Expand All @@ -376,11 +460,33 @@ func TestToZoneNetworkEndpointMapUtil(t *testing.T) {
networkEndpointFromEncodedEndpoint("10.100.3.2||instance3||8081"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod10"},
networkEndpointFromEncodedEndpoint("10.100.4.2||instance4||8081"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod11"},
},
networkEndpointType: negtypes.VMNetworkEndpointType,
},
{
desc: "Non-GCP network endpoints",
targetPort: "80",
endpointSets: map[string]negtypes.NetworkEndpointSet{
negtypes.TestZone1: negtypes.NewNetworkEndpointSet(
networkEndpointFromEncodedEndpoint("10.100.1.1||||80"),
networkEndpointFromEncodedEndpoint("10.100.1.2||||80"),
networkEndpointFromEncodedEndpoint("10.100.2.1||||80"),
networkEndpointFromEncodedEndpoint("10.100.1.3||||80")),
negtypes.TestZone2: negtypes.NewNetworkEndpointSet(
networkEndpointFromEncodedEndpoint("10.100.3.1||||80")),
},
expectMap: negtypes.EndpointPodMap{
networkEndpointFromEncodedEndpoint("10.100.1.1||||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod1"},
networkEndpointFromEncodedEndpoint("10.100.1.2||||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod2"},
networkEndpointFromEncodedEndpoint("10.100.2.1||||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod3"},
networkEndpointFromEncodedEndpoint("10.100.3.1||||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod4"},
networkEndpointFromEncodedEndpoint("10.100.1.3||||80"): types.NamespacedName{Namespace: testServiceNamespace, Name: "pod5"},
},
networkEndpointType: negtypes.NonGCPPrivateEndpointType,
},
}

for _, tc := range testCases {
retSet, retMap, err := toZoneNetworkEndpointMap(getDefaultEndpoint(), zoneGetter, tc.targetPort, podLister, "")
retSet, retMap, err := toZoneNetworkEndpointMap(getDefaultEndpoint(), zoneGetter, tc.targetPort, podLister, "", tc.networkEndpointType)
if err != nil {
t.Errorf("For case %q, expect nil error, but got %v.", tc.desc, err)
}
Expand Down
Loading

0 comments on commit 212e82a

Please sign in to comment.