Skip to content

Commit

Permalink
update Service via kubeClient
Browse files Browse the repository at this point in the history
  • Loading branch information
Ashley Gau committed Jun 22, 2018
1 parent d626339 commit 1e78753
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 37 deletions.
22 changes: 16 additions & 6 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import (
"github.com/golang/glog"
apiv1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
unversionedcore "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -63,6 +65,7 @@ type Controller struct {
endpointSynced cache.InformerSynced
ingressLister cache.Indexer
serviceLister cache.Indexer
client kubernetes.Interface

// serviceQueue takes service key as work item. Service key with format "namespace/name".
serviceQueue workqueue.RateLimitingInterface
Expand Down Expand Up @@ -97,6 +100,7 @@ func NewController(
ctx.EndpointInformer.GetIndexer())

negController := &Controller{
client: ctx.KubeClient,
manager: manager,
resyncPeriod: resyncPeriod,
recorder: recorder,
Expand Down Expand Up @@ -242,7 +246,7 @@ func (c *Controller) processService(key string) error {
if !enabled {
c.manager.StopSyncer(namespace, name)
// delete the annotation
return c.syncNegStatusAnnotation(namespace, name, service, make(PortNameMap))
return c.syncNegStatusAnnotation(namespace, name, make(PortNameMap))
}

glog.V(2).Infof("Syncing service %q", key)
Expand Down Expand Up @@ -274,26 +278,32 @@ func (c *Controller) processService(key string) error {
svcPortMap = svcPortMap.Union(negSvcPorts)
}

err = c.syncNegStatusAnnotation(namespace, name, service, svcPortMap)
err = c.syncNegStatusAnnotation(namespace, name, svcPortMap)
if err != nil {
return err
}
return c.manager.EnsureSyncers(namespace, name, svcPortMap)
}

func (c *Controller) syncNegStatusAnnotation(namespace, name string, service *apiv1.Service, portMap PortNameMap) error {
func (c *Controller) syncNegStatusAnnotation(namespace, name string, portMap PortNameMap) error {
zones, err := c.zoneGetter.ListZones()
if err != nil {
return err
}
svcClient := c.client.CoreV1().Services(namespace)
service, err := svcClient.Get(name, metav1.GetOptions{})
if err != nil {
return err
}

// Remove NEG Status Annotation when no NEG is needed
if len(portMap) == 0 {
if _, ok := service.Annotations[annotations.NEGStatusKey]; ok {
// TODO: use PATCH to remove annotation
delete(service.Annotations, annotations.NEGStatusKey)
glog.V(2).Infof("Removing expose NEG annotation from service: %s/%s", namespace, name)
return c.serviceLister.Update(service)
_, err = svcClient.Update(service)
return err
}
// service doesn't have the expose NEG annotation and doesn't need update
return nil
Expand All @@ -316,8 +326,8 @@ func (c *Controller) syncNegStatusAnnotation(namespace, name string, service *ap

service.Annotations[annotations.NEGStatusKey] = annotation
glog.V(2).Infof("Updating NEG visibility annotation %q on service %s/%s.", annotation, namespace, name)
// TODO: use PATCH to Update Annotation
return c.serviceLister.Update(service)
_, err = svcClient.Update(service)
return err
}

func (c *Controller) handleErr(err error, key interface{}) {
Expand Down
64 changes: 33 additions & 31 deletions pkg/neg/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func TestNewNonNEGService(t *testing.T) {

controller := newTestController(fake.NewSimpleClientset())
defer controller.stop()
controller.serviceLister.Add(newTestService(false, []int32{}))
controller.serviceLister.Add(newTestService(controller, false, []int32{}))
controller.ingressLister.Add(newTestIngress())
err := controller.processService(serviceKeyFunc(testServiceNamespace, testServiceName))
if err != nil {
Expand Down Expand Up @@ -140,7 +140,7 @@ func TestNewNEGService(t *testing.T) {
controller := newTestController(fake.NewSimpleClientset())
defer controller.stop()
svcKey := serviceKeyFunc(testServiceNamespace, testServiceName)
controller.serviceLister.Add(newTestService(tc.ingress, tc.svcPorts))
controller.serviceLister.Add(newTestService(controller, tc.ingress, tc.svcPorts))

if tc.ingress {
controller.ingressLister.Add(newTestIngress())
Expand All @@ -160,11 +160,12 @@ func TestNewNEGService(t *testing.T) {
expectedSyncers = len(svcPorts.Union(testIngressPorts))
}
validateSyncers(t, controller, expectedSyncers, false)
svc, exists, _ := controller.serviceLister.GetByKey(svcKey)
if !exists || err != nil {
svcClient := controller.client.CoreV1().Services(testServiceNamespace)
svc, err := svcClient.Get(testServiceName, metav1.GetOptions{})
if err != nil {
t.Fatalf("Service was not created successfully, err: %v", err)
}
validateServiceStateAnnotation(t, svc.(*apiv1.Service), tc.svcPorts)
validateServiceStateAnnotation(t, svc, tc.svcPorts)
})
}
}
Expand All @@ -174,47 +175,48 @@ func TestEnableNEGServiceWithIngress(t *testing.T) {

controller := newTestController(fake.NewSimpleClientset())
defer controller.stop()
controller.serviceLister.Add(newTestService(false, []int32{}))
controller.serviceLister.Add(newTestService(controller, false, []int32{}))
controller.ingressLister.Add(newTestIngress())
svcClient := controller.client.CoreV1().Services(testServiceNamespace)
svcKey := serviceKeyFunc(testServiceNamespace, testServiceName)
err := controller.processService(svcKey)
if err != nil {
t.Fatalf("Failed to process service: %v", err)
}
validateSyncers(t, controller, 0, true)
svc, exists, _ := controller.serviceLister.GetByKey(svcKey)
if !exists || err != nil {
t.Fatalf("Service was not created successfully, err: %v", err)
svc, err := svcClient.Get(testServiceName, metav1.GetOptions{})
if err != nil {
t.Fatalf("Service was not created.(*apiv1.Service) successfully, err: %v", err)
}

controller.serviceLister.Update(newTestService(true, []int32{}))
controller.serviceLister.Update(newTestService(controller, true, []int32{}))
err = controller.processService(svcKey)
if err != nil {
t.Fatalf("Failed to process service: %v", err)
}
validateSyncers(t, controller, 3, false)
svc, exists, _ = controller.serviceLister.GetByKey(svcKey)
svc, err = svcClient.Get(testServiceName, metav1.GetOptions{})
svcPorts := []int32{80, 8081, 443}
if !exists || err != nil {
if err != nil {
t.Fatalf("Service was not created successfully, err: %v", err)
}
validateServiceStateAnnotation(t, svc.(*apiv1.Service), svcPorts)
validateServiceStateAnnotation(t, svc, svcPorts)
}

func TestDisableNEGServiceWithIngress(t *testing.T) {
t.Parallel()

controller := newTestController(fake.NewSimpleClientset())
defer controller.stop()
controller.serviceLister.Add(newTestService(true, []int32{}))
controller.serviceLister.Add(newTestService(controller, true, []int32{}))
controller.ingressLister.Add(newTestIngress())
err := controller.processService(serviceKeyFunc(testServiceNamespace, testServiceName))
if err != nil {
t.Fatalf("Failed to process service: %v", err)
}
validateSyncers(t, controller, 3, false)

controller.serviceLister.Update(newTestService(false, []int32{}))
controller.serviceLister.Update(newTestService(controller, false, []int32{}))
err = controller.processService(serviceKeyFunc(testServiceNamespace, testServiceName))
if err != nil {
t.Fatalf("Failed to process service: %v", err)
Expand Down Expand Up @@ -311,7 +313,9 @@ func TestGatherPortMappingUsedByIngress(t *testing.T) {
}

for _, tc := range testCases {
portMap := gatherPortMappingUsedByIngress(tc.ings, newTestService(true, []int32{}))
controller := newTestController(fake.NewSimpleClientset())
defer controller.stop()
portMap := gatherPortMappingUsedByIngress(tc.ings, newTestService(controller, true, []int32{}))
if len(portMap) != len(tc.expect) {
t.Errorf("Expect %v ports, but got %v.", len(tc.expect), len(portMap))
}
Expand All @@ -330,8 +334,8 @@ func TestSyncNegAnnotation(t *testing.T) {
// is changed. When there is no change, Update should not be called.
controller := newTestController(fake.NewSimpleClientset())
defer controller.stop()
controller.serviceLister.Add(newTestService(false, []int32{}))
svcKey := serviceKeyFunc(testServiceNamespace, testServiceName)
svcClient := controller.client.CoreV1().Services(testServiceNamespace)
newTestService(controller, false, []int32{})

testCases := []struct {
desc string
Expand Down Expand Up @@ -363,28 +367,23 @@ func TestSyncNegAnnotation(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
svc, exists, err := controller.serviceLister.GetByKey(svcKey)
if !exists || err != nil {
t.Fatalf("Service was not retrieved successfully, err: %v", err)
}

controller.syncNegStatusAnnotation(testServiceNamespace, testServiceName, svc.(*apiv1.Service), tc.previousPortMap)
svc, _, _ = controller.serviceLister.GetByKey(svcKey)
controller.syncNegStatusAnnotation(testServiceNamespace, testServiceName, tc.previousPortMap)
svc, _ := svcClient.Get(testServiceName, metav1.GetOptions{})

var oldSvcPorts []int32
for port := range tc.previousPortMap {
oldSvcPorts = append(oldSvcPorts, port)
}
validateServiceStateAnnotation(t, svc.(*apiv1.Service), oldSvcPorts)
validateServiceStateAnnotation(t, svc, oldSvcPorts)

controller.syncNegStatusAnnotation(testServiceNamespace, testServiceName, svc.(*apiv1.Service), tc.portMap)
svc, _, _ = controller.serviceLister.GetByKey(svcKey)
controller.syncNegStatusAnnotation(testServiceNamespace, testServiceName, tc.portMap)
svc, _ = svcClient.Get(testServiceName, metav1.GetOptions{})

var svcPorts []int32
for port := range tc.portMap {
svcPorts = append(svcPorts, port)
}
validateServiceStateAnnotation(t, svc.(*apiv1.Service), svcPorts)
validateServiceStateAnnotation(t, svc, svcPorts)
})
}
}
Expand Down Expand Up @@ -490,7 +489,7 @@ func newTestIngress() *extensions.Ingress {
}
}

func newTestService(negIngress bool, negSvcPorts []int32) *apiv1.Service {
func newTestService(c *Controller, negIngress bool, negSvcPorts []int32) *apiv1.Service {
svcAnnotations := map[string]string{}
if negIngress || len(negSvcPorts) > 0 {
svcAnnotations[annotations.NEGAnnotationKey] = generateNegAnnotation(negIngress, negSvcPorts)
Expand Down Expand Up @@ -526,7 +525,7 @@ func newTestService(negIngress bool, negSvcPorts []int32) *apiv1.Service {
)
}

return &apiv1.Service{
svc := &apiv1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: testServiceName,
Namespace: testServiceNamespace,
Expand All @@ -536,4 +535,7 @@ func newTestService(negIngress bool, negSvcPorts []int32) *apiv1.Service {
Ports: ports,
},
}

c.client.CoreV1().Services(testServiceNamespace).Create(svc)
return svc
}

0 comments on commit 1e78753

Please sign in to comment.