Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Backend GC #810

Merged
merged 1 commit into from
Aug 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions pkg/backends/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package backends

import (
"fmt"
"k8s.io/ingress-gce/pkg/flags"
"net/http"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
Expand Down Expand Up @@ -195,17 +194,13 @@ func (b *Backends) Health(name string, version meta.Version, scope meta.KeyType)
}

// List lists all backends managed by this controller.
func (b *Backends) List() ([]*composite.BackendService, error) {
func (b *Backends) List(key *meta.Key, version meta.Version) ([]*composite.BackendService, error) {
// TODO: for consistency with the rest of this sub-package this method
// should return a list of backend ports.
var backends []*composite.BackendService
var err error
if flags.F.EnableL7Ilb {
backends, err = composite.ListAllBackendServices(b.cloud)
} else {
// TODO: (shance) this needs to be changed to not take a key
backends, err = composite.ListBackendServices(b.cloud, meta.GlobalKey(""), meta.VersionGA)
}

backends, err = composite.ListBackendServices(b.cloud, key, version)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/backends/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type Pool interface {
// Get the health of a BackendService given its name.
Health(name string, version meta.Version, scope meta.KeyType) (string, error)
// Get a list of BackendService names that are managed by this pool.
List() ([]*composite.BackendService, error)
List(key *meta.Key, version meta.Version) ([]*composite.BackendService, error)
}

// Syncer is an interface to sync Kubernetes services to GCE BackendServices.
Expand Down
38 changes: 35 additions & 3 deletions pkg/backends/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/ingress-gce/pkg/backends/features"
"k8s.io/ingress-gce/pkg/composite"
"k8s.io/ingress-gce/pkg/flags"
"k8s.io/ingress-gce/pkg/healthchecks"
lbfeatures "k8s.io/ingress-gce/pkg/loadbalancers/features"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/klog"
"k8s.io/legacy-cloud-providers/gce"
Expand Down Expand Up @@ -68,7 +70,6 @@ func (s *backendSyncer) Sync(svcPorts []utils.ServicePort) error {
}
}
return nil

}

// ensureBackendService will update or create a BackendService for the given port.
Expand Down Expand Up @@ -143,11 +144,42 @@ func (s *backendSyncer) GC(svcPorts []utils.ServicePort) error {
return err
}

backends, err := s.backendPool.List()
// Only GC L7 ILB backends if it's enabled
if flags.F.EnableL7Ilb {
// TODO(shance): Refactor out empty key field
key, err := composite.CreateKey(s.cloud, "", meta.Regional)
if err != nil {
return fmt.Errorf("error creating l7 ilb key: %v", err)
}
ilbBackends, err := s.backendPool.List(key, lbfeatures.L7ILBVersions().BackendService)
if err != nil {
return fmt.Errorf("error listing regional backends: %v", err)
}
err = s.gc(ilbBackends, knownPorts)
if err != nil {
return fmt.Errorf("error GCing regional Backends: %v", err)
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a TODO, but we may want to consider continuing on error so a single error does not block all GC operations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tracking here: #797


// Requires an empty name field until it is refactored out
key, err := composite.CreateKey(s.cloud, "", meta.Global)
if err != nil {
return fmt.Errorf("error creating l7 ilb key: %v", err)
}
backends, err := s.backendPool.List(key, meta.VersionGA)
if err != nil {
return fmt.Errorf("error listing backends: %v", err)
}
err = s.gc(backends, knownPorts)
if err != nil {
return fmt.Errorf("error getting the names of controller-managed backends: %v", err)
return fmt.Errorf("error GCing Backends: %v", err)
}

return nil
}

// gc deletes the provided backends
func (s *backendSyncer) gc(backends []*composite.BackendService, knownPorts sets.String) error {
for _, be := range backends {
var key *meta.Key
name := be.Name
Expand Down
179 changes: 147 additions & 32 deletions pkg/backends/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package backends
import (
"context"
"fmt"
"k8s.io/ingress-gce/pkg/composite"
"net/http"
"reflect"
"testing"
Expand All @@ -39,6 +40,78 @@ import (
"k8s.io/legacy-cloud-providers/gce"
)

// portset helps keep track of service ports during GC tests
type portset struct {
// all represents the set all of service ports in the test
all map[utils.ServicePort]bool
// existing represents what should exist in GCE
existing map[utils.ServicePort]bool
}

func newPortset(ports []utils.ServicePort) *portset {
ps := portset{all: map[utils.ServicePort]bool{}, existing: map[utils.ServicePort]bool{}}
for _, sp := range ports {
ps.all[sp] = true
}
return &ps
}

func (p *portset) existingPorts() []utils.ServicePort {
var result []utils.ServicePort
for sp, _ := range p.existing {
result = append(result, sp)
}
return result
}

// Add to 'existing' from all
func (p *portset) add(ports []utils.ServicePort) error {
for _, sp := range ports {
// Sanity check
if found := p.all[sp]; !found {
return fmt.Errorf("%+v not found in p.all", sp)
}
p.existing[sp] = true
}
return nil
}

// Delete from 'existing'
func (p *portset) del(ports []utils.ServicePort) error {
for _, sp := range ports {
found := p.existing[sp]
if !found {
return fmt.Errorf("%+v not found in p.existing", sp)
}
delete(p.existing, sp)
}
return nil
}

// check() iterates through all and checks that the ports in 'existing' exist in gce, and that those
// that are not in 'existing' do not exist
func (p *portset) check(fakeGCE *gce.Cloud) error {
for sp, _ := range p.all {
_, found := p.existing[sp]
beName := sp.BackendName(defaultNamer)
key, err := composite.CreateKey(fakeGCE, beName, features.ScopeFromServicePort(&sp))
if err != nil {
return fmt.Errorf("Error creating key for backend service %s: %v", beName, err)
}

if found {
if _, err := composite.GetBackendService(fakeGCE, key, features.VersionFromServicePort(&sp)); err != nil {
return fmt.Errorf("backend for port %+v should exist, but got: %v", sp.NodePort, err)
}
} else {
if bs, err := composite.GetBackendService(fakeGCE, key, features.VersionFromServicePort(&sp)); !utils.IsHTTPErrorCode(err, http.StatusNotFound) {
return fmt.Errorf("backend for port %+v should not exist, but got %v", sp, bs)
}
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to check things in p.all not in p.existing don't exist?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return nil
}

var (
defaultNamer = utils.NewNamer("uid1", "fw1")
defaultBackendSvc = types.NamespacedName{Namespace: "system", Name: "default"}
Expand All @@ -65,6 +138,7 @@ func newTestSyncer(fakeGCE *gce.Cloud) *backendSyncer {
backendPool: fakeBackendPool,
healthChecker: fakeHealthChecks,
namer: defaultNamer,
cloud: fakeGCE,
}

probes := map[utils.ServicePort]*api_v1.Probe{{NodePort: 443, Protocol: annotations.ProtocolHTTPS}: existingProbe}
Expand All @@ -76,6 +150,7 @@ func newTestSyncer(fakeGCE *gce.Cloud) *backendSyncer {
(fakeGCE.Compute().(*cloud.MockGCE)).MockBackendServices.UpdateHook = mock.UpdateBackendServiceHook
(fakeGCE.Compute().(*cloud.MockGCE)).MockHealthChecks.UpdateHook = mock.UpdateHealthCheckHook
(fakeGCE.Compute().(*cloud.MockGCE)).MockAlphaHealthChecks.UpdateHook = mock.UpdateAlphaHealthCheckHook
(fakeGCE.Compute().(*cloud.MockGCE)).MockAlphaRegionHealthChecks.UpdateHook = mock.UpdateAlphaRegionHealthCheckHook
(fakeGCE.Compute().(*cloud.MockGCE)).MockBetaHealthChecks.UpdateHook = mock.UpdateBetaHealthCheckHook

return syncer
Expand Down Expand Up @@ -212,6 +287,7 @@ func TestSyncUpdateHTTP2(t *testing.T) {
}
}

// Test GC with both ELB and ILBs
func TestGC(t *testing.T) {
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
syncer := newTestSyncer(fakeGCE)
Expand All @@ -221,49 +297,88 @@ func TestGC(t *testing.T) {
{NodePort: 82, Protocol: annotations.ProtocolHTTPS},
{NodePort: 83, Protocol: annotations.ProtocolHTTP},
}
ps := newPortset(svcNodePorts)
if err := ps.add(svcNodePorts); err != nil {
t.Fatal(err)
}

if err := syncer.Sync(svcNodePorts); err != nil {
t.Fatalf("Expected syncer to add backends with error, err: %v", err)
if err := syncer.Sync(ps.existingPorts()); err != nil {
t.Fatalf("syncer.Sync(%+v) = %v, want nil ", ps.existingPorts(), err)
}
// Check that all backends were created.
for _, sp := range svcNodePorts {
beName := sp.BackendName(defaultNamer)
if _, err := fakeGCE.GetGlobalBackendService(beName); err != nil {
t.Fatalf("Expected to find backend for port %v, err: %v", sp.NodePort, err)
}

if err := ps.check(fakeGCE); err != nil {
t.Fatal(err)
}

// Run a no-op GC (i.e nothing is actually cleaned up)
if err := syncer.GC(svcNodePorts); err != nil {
t.Fatalf("Expected backend pool to GC, err: %v", err)
if err := syncer.GC(ps.existingPorts()); err != nil {
t.Fatalf("syncer.GC(%+v) = %v, want nil", ps.existingPorts(), err)
}
// Ensure that no backends were actually deleted
for _, sp := range svcNodePorts {
beName := sp.BackendName(defaultNamer)
if _, err := fakeGCE.GetGlobalBackendService(beName); err != nil {
t.Fatalf("Expected to find backend for port %v, err: %v", sp.NodePort, err)
}

// Check that nothing was deleted
if err := ps.check(fakeGCE); err != nil {
t.Fatal(err)
}

deletedPorts := []utils.ServicePort{svcNodePorts[1], svcNodePorts[2]}
svcNodePorts = []utils.ServicePort{svcNodePorts[0]}
if err := syncer.GC(svcNodePorts); err != nil {
t.Fatalf("Expected backend pool to GC, err: %v", err)
if err := ps.del([]utils.ServicePort{svcNodePorts[1], svcNodePorts[2]}); err != nil {
t.Fatal(err)
}

// Ensure that 2 out of the 3 backends were deleted
for _, sp := range deletedPorts {
beName := sp.BackendName(defaultNamer)
if _, err := fakeGCE.GetGlobalBackendService(beName); err == nil {
t.Fatalf("Expected to not find backend for port %v", sp.NodePort)
}
if err := syncer.GC(ps.existingPorts()); err != nil {
t.Fatalf("syncer.GC(%+v) = %v, want nil", ps.existingPorts(), err)
}

// Ensure that the 1 remaining backend exists
for _, sp := range svcNodePorts {
beName := sp.BackendName(defaultNamer)
if _, err := fakeGCE.GetGlobalBackendService(beName); err != nil {
t.Fatalf("Expected to find backend for port %v, err: %v", sp.NodePort, err)
}
if err := ps.check(fakeGCE); err != nil {
t.Fatal(err)
}
}

// Test GC with both ELB and ILBs
bowei marked this conversation as resolved.
Show resolved Hide resolved
func TestGCMixed(t *testing.T) {
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
syncer := newTestSyncer(fakeGCE)

svcNodePorts := []utils.ServicePort{
{NodePort: 81, Protocol: annotations.ProtocolHTTP},
{NodePort: 82, Protocol: annotations.ProtocolHTTPS},
{NodePort: 83, Protocol: annotations.ProtocolHTTP},
{NodePort: 84, Protocol: annotations.ProtocolHTTP, NEGEnabled: true, L7ILBEnabled: true},
{NodePort: 85, Protocol: annotations.ProtocolHTTPS, NEGEnabled: true, L7ILBEnabled: true},
{NodePort: 86, Protocol: annotations.ProtocolHTTP, NEGEnabled: true, L7ILBEnabled: true},
}
ps := newPortset(svcNodePorts)
if err := ps.add(svcNodePorts); err != nil {
t.Fatal(err)
}

if err := syncer.Sync(ps.existingPorts()); err != nil {
t.Fatalf("syncer.Sync(%+v) = %v, want nil ", ps.existingPorts(), err)
}

if err := ps.check(fakeGCE); err != nil {
t.Fatal(err)
}

// Run a no-op GC (i.e nothing is actually cleaned up)
if err := syncer.GC(ps.existingPorts()); err != nil {
t.Fatalf("syncer.GC(%+v) = %v, want nil", ps.existingPorts(), err)
}

// Check that nothing was deleted
if err := ps.check(fakeGCE); err != nil {
t.Fatal(err)
}

if err := ps.del([]utils.ServicePort{svcNodePorts[1], svcNodePorts[2]}); err != nil {
t.Fatal(err)
}

if err := syncer.GC(ps.existingPorts()); err != nil {
t.Fatalf("syncer.GC(%+v) = %v, want nil", ps.existingPorts(), err)
}

if err := ps.check(fakeGCE); err != nil {
t.Fatal(err)
}
}

Expand Down
21 changes: 15 additions & 6 deletions pkg/healthchecks/healthchecks.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,17 +208,16 @@ func (h *HealthChecks) updateILB(oldHC, newHC *HealthCheck) error {
// special case ILB to avoid mucking with stable HC code
cloud := h.cloud.(*gce.Cloud)

compositeType, err := composite.ToHealthCheck(newHC)
mergedHC := mergeHealthcheck(oldHC, newHC).ToAlphaComputeHealthCheck()
compositeType, err := composite.ToHealthCheck(mergedHC)
if err != nil {
return fmt.Errorf("Error converting newHC to composite: %v", err)
}
key, err := composite.CreateKey(cloud, newHC.Name, features.L7ILBScope())
key, err := composite.CreateKey(cloud, mergedHC.Name, features.L7ILBScope())

// Update fields
compositeType.Version = features.L7ILBVersions().HealthCheck
compositeType.Region = key.Region
compositeType.HttpHealthCheck.Port = 0
compositeType.HttpHealthCheck.PortSpecification = oldHC.HttpHealthCheck.PortSpecification

return composite.UpdateHealthCheck(cloud, key, compositeType)
}
Expand Down Expand Up @@ -310,11 +309,21 @@ func (h *HealthChecks) getILB(name string) (*HealthCheck, error) {
if err != nil {
return nil, err
}
alphaHC, err := hc.ToAlpha()
gceHC, err := hc.ToAlpha()
if err != nil {
return nil, err
}
return NewHealthCheck(alphaHC)

newHC, err := NewHealthCheck(gceHC)
if err != nil {
return nil, err
}

// Update fields for future update() calls
newHC.forILB = true
newHC.ForNEG = true

return newHC, nil
}

// Get returns the health check by port
Expand Down
1 change: 1 addition & 0 deletions pkg/loadbalancers/loadbalancers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,7 @@ func TestIdenticalHostnameCerts(t *testing.T) {
UrlMap: gceUrlMap,
Ingress: newIngress(),
}

// Sync multiple times to make sure ordering is preserved
for i := 0; i < 10; i++ {
if _, err := j.pool.Ensure(lbInfo); err != nil {
Expand Down