Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

K8s-NEG Integration #48

Merged
merged 7 commits into from
Oct 27, 2017
Merged

K8s-NEG Integration #48

merged 7 commits into from
Oct 27, 2017

Conversation

freehan
Copy link
Contributor

@freehan freehan commented Oct 11, 2017

End-to-end K8s-NEG integration

@k8s-ci-robot k8s-ci-robot added cncf-cla: yes Indicates the PR's author has signed the CNCF CLA. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. labels Oct 11, 2017
@freehan
Copy link
Contributor Author

freehan commented Oct 11, 2017

/assign bowei
/assign nicksardo
/assign thockin

@freehan
Copy link
Contributor Author

freehan commented Oct 12, 2017

cc @nikhiljindal

@freehan
Copy link
Contributor Author

freehan commented Oct 16, 2017

cloud provider change was merged: #54
Rebased on top of that. Now this PR is clean.

cmd/glbc/main.go Outdated
// Start loadbalancer controller
lbc, err := controller.NewLoadBalancerController(kubeClient, ctx, clusterManager)
lbc, err := controller.NewLoadBalancerController(kubeClient, ctx, clusterManager, cloud.AlphaFeatureGate.Enabled(gce.AlphaFeatureNetworkEndpointGroup))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is negEnabled passed to NewLoadBalancerController when it's accessible in the ctx?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ControllerContext does not have a flag for negEnabled. It has a flag for enabling endpoint informer.

cmd/glbc/main.go Outdated
@@ -301,6 +302,13 @@ func main() {
glog.V(3).Infof("Cluster name %+v", clusterManager.ClusterNamer.GetClusterName())
}
clusterManager.Init(&controller.GCETranslator{LoadBalancerController: lbc})

// Start NEG controller
if cloud.AlphaFeatureGate.Enabled(gce.AlphaFeatureNetworkEndpointGroup) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we determine this value once at the start of the file instead of recalling this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

Copy link
Contributor

@nicksardo nicksardo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The HealthCheck changes divert from the existing design pattern and introduces complexity into consumer packages. If we needed to add support for HTTP2HealthChecks soon, it would be more difficult with this code.

Protocol utils.AppProtocol
SvcName types.NamespacedName
SvcPort intstr.IntOrString
Port int64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you comment that this is NodePort. Also, add TODO for changing the name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

b := &computealpha.Backend{
Group: neg.SelfLink,
BalancingMode: string(Rate),
MaxRate: maxRPS,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Believe we should use MaxRatePerEndpoint to be consistent with MaxRatePerInstance


negName := b.namer.NEGName(port.SvcName.Namespace, port.SvcName.Name, port.SvcTargetPort)

negs := []*computealpha.NetworkEndpointGroup{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: var negs []*computealpha.NetworkEndpointGroup

negs = append(negs, neg)
}

backendService, err := b.cloud.GetAlphaGlobalBackendService(b.namer.BeName(port.Port))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a note somewhere saying the services must still be type NodePort? May be ideal to have a neg-design.md file that documents behavior, caveats, and TODOs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a NEG to beta doc that included this.

for _, backend := range backendService.Backends {
found := false
for _, negBackend := range targetBackends {
// Warnning: Group link includes the api version.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sp

needToUpdate := len(backendService.Backends) == 0
for _, backend := range backendService.Backends {
found := false
for _, negBackend := range targetBackends {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear what exactly you want from this logic. Do you want set equality? Right now, you're only testing whether the targetBackends exist in the current set - not whether there are extraneous backends in the current set.

Could we use sets.NewString() and build two separate sets and test for equality? The code would be smaller, fewer indents, & easier to grok.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

hc.Description = description
hc.CheckIntervalSec = checkInterval
hc.TimeoutSec = timeout
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a large problem with how the alpha health check is incorporated, and this exemplifies why. The point of the HealthCheck struct is to present all parameters at the root level so other packages do not need to have this conditional mess.

@@ -174,6 +266,7 @@ func DefaultHealthCheck(port int64, protocol utils.AppProtocol) *HealthCheck {
type HealthCheck struct {
compute.HTTPHealthCheck
compute.HealthCheck
AlphaHealthCheck *computealpha.HealthCheck
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than just adding this as a field. We should change the struct to:

type HealthCheck struct {
  	computealpha.HTTPHealthCheck
  	computealpha.HealthCheck

We know that computealpha.HealthCheck and computealpha.HTTPHealthCheck are supersets of their compute. variants, so let's use them to store everything. It just means that when we retrieve an existing health check or output a health check struct, we need two more conversion functions. Just because we're using the alpha structs here does not mean we need to always use the alpha-API. We're just using them as structs which are compatible with both alpha and GA structs.

Copy link
Contributor Author

@freehan freehan Oct 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM. I was afraid that it will mess up with existing logic.

@freehan
Copy link
Contributor Author

freehan commented Oct 20, 2017

Fixed the comments. Ready for another round.

@freehan
Copy link
Contributor Author

freehan commented Oct 23, 2017

fixed comments and rebased. PTAL

"k8s1-0123456789abcdef-0123456789012-0123456789-0123456-71877a60",
},
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unit test trimFieldsEvenly to make sure we don't generate labels that are too long or crash if there is not enough space.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

// Syncer is an interface to interact with syncer
type Syncer interface {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document the methods


// SyncerManager is an interface for controllers to manage Syncers
type SyncerManager interface {
EnsureSyncer(namespace, name string, targetPorts sets.String) error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document the methods

"k8s.io/client-go/tools/record"
)

// syncerManager exposes a few interfaces to manage syncer and ensures thread safety.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this actually contain all the active sync goroutines and manage their lifecycle? Update the docstring comment, right now I find the description to be not very useful.

mu sync.Mutex
// svcPortMap is the canonical indicator for whether a service needs NEG
// key is service namespace/name, value is the list of target port that requires NEG
svcPortMap map[string]sets.String
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not

type serviceKey struct {
  ns string
  name string
}

type targetPorts sets.String

servicePortMap map[serviceKey]targetPorts

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the serviceKey (namespace/name) is consistent with the key used in apimachinery

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the serviceKey struct. But type targetPorts sets.String does not work. I am relying on methods like sets.String.Difference. This function is expecting another sets.Sting type arg.

leaving the value as sets.String is cleaner.

// key is service namespace/name, value is the list of target port that requires NEG
svcPortMap map[string]sets.String
// syncerMap stores the NEG syncer
// key is service namespace/name/targetPort. Value is the corresponding syncer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type struct negKey {
namespace string
name string
targetPort int
}

}
}

errList := []error{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this to right before it is used.

}

errList := []error{}
// Start syncer for added ports
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Ensure a syncer is running for each port that is being added.

}

// StopSyncer stops all syncers for the input service.
func (manager *syncerManager) StopSyncer(namespace, name string) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably better name: StopServiceSyncers

key := serviceKeyFunc(namespace, name)
if ports, ok := manager.svcPortMap[key]; ok {
for _, port := range ports.List() {
syncer, ok := manager.syncerMap[encodeSyncerKey(namespace, name, port)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] if syncer, ok := ...; ok {

func (manager *syncerManager) GC() error {
glog.V(2).Infof("Start NEG garbage collection.")
defer glog.V(2).Infof("NEG garbage collection finished.")
// Garbage collect syncer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete this comment

resyncPeriod time.Duration,
) (*Controller, error) {
// init event recorder
eventBroadcaster := record.NewBroadcaster()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this reuse the one created at the top level in Ingress

Copy link
Contributor Author

@freehan freehan Oct 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, need to move event recorder initialization in main. I think it would better to change this in a follow up PR. Added a TODO

}, stopCh)

glog.V(2).Infof("Starting network endpoint group controller")
defer c.stop()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to have explicit order.

defer func() {
 ...
}

if err != nil {
return err
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer not to have multipath dep on conditional

if ! exists {
 c.manager.StopSyncer(...)
}

func (c *Controller) garbageCollection() {
if err := c.manager.GC(); err != nil {
glog.Errorf("NEG controller garbage collection failed: %v", err)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generate an event

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Events need to be associated with a API object.
I am not sure which one to associate this with.

return
}

glog.Warningf("Dropping service %q out of the queue: %v", key, err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generate an event

}

func TestGatherSerivceTargetPortUsedByIngress(t *testing.T) {
ings := []extensions.Ingress{*getTestIngress(), *getTestIngress()}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have a few more test cases here?

DefaultUnhealthyThreshold = 10
// DefaultTimeout defines the timeout of each probe
// DefaultNEGUnhealthyThreshold defines the threshold of failure probes that declare a network endpoint "unhealthy"
DefaultNEGUnhealthyThreshold = 2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably want to state this is derived how?

// This is to preserve the existing health check setting as much as possible.
// WARNING: if a service backend is converted from IG mode to NEG mode,
// the existing health check setting will be preserve, although it may not suit the customer needs.
func mergeHealthcheckForNEG(oldHC, newHC *HealthCheck) *HealthCheck {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this actually do anything other than a straight copy of all the fields?

Copy link
Contributor Author

@freehan freehan Oct 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the approach @nicksardo and I discussed. It mainly copies the fields and manipulate port and portSpecification field to make it compatible. So that any manual changes are preserved while switching between IG and NEG mode.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

hc := healthChecks.New(8000, utils.ProtocolHTTP, true)
_, err := healthChecks.Sync(hc)
if err != nil {
t.Fatalf("unexpected error: %v", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got %v, want nil


ret, err := healthChecks.Get(8000, true)
if err != nil {
t.Fatalf("unexpected error: %v", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got %v, want nil

t.Fatalf("unexpected error: %v", err)
}
if ret.Port != 0 {
t.Errorf("Expect port to not be specified, but got %d.", ret.Port)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got ret.Port == %d, want 0

t.Errorf("Expect port to not be specified, but got %d.", ret.Port)
}
if ret.PortSpecification != UseServingPortSpecification {
t.Errorf("Expect port specification to be %q, but got %q.", UseServingPortSpecification, ret.PortSpecification)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got ret.PortSpecification = %q, want %q

@freehan
Copy link
Contributor Author

freehan commented Oct 24, 2017

Fixed the commemnts. PTAL

// For NEG, the api version for health check will be alpha.
// Hence, it will cause the health check links to be always different
// TODO (mixia): compare health check link directly once NEG is GA
splited := strings.Split(existingHCLink, "/")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this its own helper func

@@ -358,6 +363,42 @@ IngressLoop:
return
}

func (s *StoreToEndpointLister) ListEndpointTargetPorts(namespace, name, targetPort string) []int {
// if targetPort is integer, no need to translate to endpoint ports
i, _ := strconv.Atoi(targetPort)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems more correct to do:

if i, err := strconv.Atoi(targetPort); err == nil {
  return []int{i}
}

Rather than relying on the special value of 0

ret := []int{}

if !exists {
glog.Errorf("Endpoint object %v/%v does not exists.", namespace, name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] exist

no s

},
},
)
ret := []int{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this down to the for loop

}
if err != nil {
glog.Errorf("Failed to retrieve endpoint object %v/%v: %v", namespace, name, err)
return ret
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return []int{}


if !exists {
glog.Errorf("Endpoint object %v/%v does not exists.", namespace, name)
return ret
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return []int{}

Start() error
// Start stops the syncer
Stop()
// Sync signals the syncer to sync NEG
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

State if this is blocking or async

// Start starts the syncer
Start() error
// Start stops the syncer
Stop()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

state if this waits for syncer to stop

type negSyncerManager interface {
// EnsureSyncer ensures corresponding syncers are started and stops any unnecessary syncer
EnsureSyncer(namespace, name string, targetPorts sets.String) error
// StopSyncer stops all syncers related to the service
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

state whether this waits for actually stop

// StopSyncer stops all syncers related to the service
StopSyncer(namespace, name string)
// Sync signals all syncers related to the service to sync
Sync(namespace, name string)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

state if this is async or blocking

}

// EnsureSyncer starts and stops syncers based on the input service ports.
func (manager *syncerManager) EnsureSyncer(namespace, name string, targetPorts sets.String) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EnsureSyncers

@freehan
Copy link
Contributor Author

freehan commented Oct 26, 2017

Fixed the comments. PTAL

@freehan
Copy link
Contributor Author

freehan commented Oct 26, 2017

Rebased. Let us merge it today.

@bowei bowei merged commit b0603c6 into kubernetes:master Oct 27, 2017
@freehan
Copy link
Contributor Author

freehan commented Oct 27, 2017

h00t!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cncf-cla: yes Indicates the PR's author has signed the CNCF CLA. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants