Skip to content

Commit

Permalink
Merge pull request #1127 from skmatti/patch-svc
Browse files Browse the repository at this point in the history
Use Patch instead of Update for k8s Service client
  • Loading branch information
k8s-ci-robot committed Oct 7, 2020
2 parents 182030e + f780ca4 commit 8ccedad
Show file tree
Hide file tree
Showing 11 changed files with 347 additions and 185 deletions.
4 changes: 2 additions & 2 deletions pkg/cmconfig/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/patch"
"k8s.io/klog"
)

Expand Down Expand Up @@ -65,7 +65,7 @@ func (c *ConfigMapConfigController) GetConfig() Config {
}

func (c *ConfigMapConfigController) updateASMReady(status string) {
patchBytes, err := utils.StrategicMergePatchBytes(v1.ConfigMap{Data: map[string]string{}},
patchBytes, err := patch.StrategicMergePatchBytes(v1.ConfigMap{Data: map[string]string{}},
v1.ConfigMap{Data: map[string]string{asmReady: status}}, v1.ConfigMap{})
if err != nil {
c.RecordEvent("Warning", "FailedToUpdateASMStatus", fmt.Sprintf("Failed to update ASM Status, failed to create patch for ASM ConfigMap, error: %s", err))
Expand Down
4 changes: 2 additions & 2 deletions pkg/experimental/workload/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
workloadv1a1 "k8s.io/ingress-gce/pkg/experimental/apis/workload/v1alpha1"
workloadclient "k8s.io/ingress-gce/pkg/experimental/workload/client/clientset/versioned"
daemonutils "k8s.io/ingress-gce/pkg/experimental/workload/daemon/utils"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/patch"
"k8s.io/klog"
)

Expand Down Expand Up @@ -177,7 +177,7 @@ func OutputCredentials(credentials daemonutils.ClusterCredentials) {

// preparePatchBytesforWorkloadStatus generates patch bytes based on the old and new workload status
func preparePatchBytesforWorkloadStatus(oldStatus, newStatus workloadv1a1.WorkloadStatus) ([]byte, error) {
patchBytes, err := utils.StrategicMergePatchBytes(
patchBytes, err := patch.StrategicMergePatchBytes(
workloadv1a1.Workload{Status: oldStatus},
workloadv1a1.Workload{Status: newStatus},
workloadv1a1.Workload{},
Expand Down
39 changes: 9 additions & 30 deletions pkg/l4/l4controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,16 @@ limitations under the License.
package l4

import (
context2 "context"
"fmt"
"reflect"
"sync"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/cloud-provider/service/helpers"
"k8s.io/ingress-gce/pkg/annotations"
"k8s.io/ingress-gce/pkg/backends"
"k8s.io/ingress-gce/pkg/context"
Expand All @@ -40,6 +37,7 @@ import (
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/common"
"k8s.io/ingress-gce/pkg/utils/namer"
"k8s.io/ingress-gce/pkg/utils/patch"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/apis/core/v1/helper"
)
Expand Down Expand Up @@ -186,7 +184,7 @@ func (l4c *L4Controller) processServiceCreateOrUpdate(key string, service *v1.Se
}
l4c.ctx.Recorder(service.Namespace).Eventf(service, v1.EventTypeNormal, "SyncLoadBalancerSuccessful",
"Successfully ensured load balancer resources")
if err = l4c.updateAnnotations(service.Name, service.Namespace, annotationsMap); err != nil {
if err = l4c.updateAnnotations(service, annotationsMap); err != nil {
l4c.ctx.Recorder(service.Namespace).Eventf(service, v1.EventTypeWarning, "SyncLoadBalancerFailed",
"Failed to update annotations for load balancer, err: %v", err)
return fmt.Errorf("failed to set resource annotations, err: %v", err)
Expand All @@ -202,7 +200,7 @@ func (l4c *L4Controller) processServiceDeletion(key string, svc *v1.Service) err
return err
}
// Also remove any ILB annotations from the service metadata
if err := l4c.updateAnnotations(svc.Name, svc.Namespace, nil); err != nil {
if err := l4c.updateAnnotations(svc, nil); err != nil {
l4c.ctx.Recorder(svc.Namespace).Eventf(svc, v1.EventTypeWarning, "DeleteLoadBalancer",
"Error resetting resource annotations for load balancer: %v", err)
return fmt.Errorf("failed to reset resource annotations, err: %v", err)
Expand Down Expand Up @@ -272,36 +270,17 @@ func (l4c *L4Controller) updateServiceStatus(svc *v1.Service, newStatus *v1.Load
if helper.LoadBalancerStatusEqual(&svc.Status.LoadBalancer, newStatus) {
return nil
}
updated := svc.DeepCopy()
updated.Status.LoadBalancer = *newStatus
if _, err := helpers.PatchService(l4c.ctx.KubeClient.CoreV1(), svc, updated); err != nil {
return err
}
return nil
return patch.PatchServiceLoadBalancerStatus(l4c.ctx.KubeClient.CoreV1(), svc, *newStatus)
}

func (l4c *L4Controller) updateServiceMetadata(svc *v1.Service, newObjectMetadata metav1.ObjectMeta) error {
updated := svc.DeepCopy()
updated.ObjectMeta = newObjectMetadata
if _, err := helpers.PatchService(l4c.ctx.KubeClient.CoreV1(), svc, updated); err != nil {
return err
}
return nil
}

func (l4c *L4Controller) updateAnnotations(name, namespace string, newILBAnnotations map[string]string) error {
svcClient := l4c.ctx.KubeClient.CoreV1().Services(namespace)
currSvc, err := svcClient.Get(context2.TODO(), name, metav1.GetOptions{})
if err != nil {
return err
}
newObjectMeta := currSvc.ObjectMeta.DeepCopy()
func (l4c *L4Controller) updateAnnotations(svc *v1.Service, newILBAnnotations map[string]string) error {
newObjectMeta := svc.ObjectMeta.DeepCopy()
newObjectMeta.Annotations = mergeAnnotations(newObjectMeta.Annotations, newILBAnnotations)
if reflect.DeepEqual(currSvc.Annotations, newObjectMeta.Annotations) {
if reflect.DeepEqual(svc.Annotations, newObjectMeta.Annotations) {
return nil
}
klog.V(3).Infof("Updating annotations of service %v/%v", namespace, name)
return l4c.updateServiceMetadata(currSvc, *newObjectMeta)
klog.V(3).Infof("Patching annotations of service %v/%v", svc.Namespace, svc.Name)
return patch.PatchServiceObjectMetadata(l4c.ctx.KubeClient.CoreV1(), svc, *newObjectMeta)
}

// mergeAnnotations merges the new set of ilb resource annotations with the pre-existing service annotations.
Expand Down
24 changes: 12 additions & 12 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/common"
namer2 "k8s.io/ingress-gce/pkg/utils/namer"
"k8s.io/ingress-gce/pkg/utils/patch"
"k8s.io/klog"
)

Expand Down Expand Up @@ -624,20 +625,19 @@ func (c *Controller) syncNegStatusAnnotation(namespace, name string, portMap neg
if err != nil {
return err
}
svcClient := c.client.CoreV1().Services(namespace)
service, err := svcClient.Get(context2.TODO(), name, metav1.GetOptions{})
coreClient := c.client.CoreV1()
service, err := coreClient.Services(namespace).Get(context2.TODO(), name, metav1.GetOptions{})
if err != nil {
return err
}

// Remove NEG Status Annotation when no NEG is needed
if len(portMap) == 0 {
if _, ok := service.Annotations[annotations.NEGStatusKey]; ok {
// TODO: use PATCH to remove annotation
delete(service.Annotations, annotations.NEGStatusKey)
newSvcObjectMeta := service.ObjectMeta.DeepCopy()
delete(newSvcObjectMeta.Annotations, annotations.NEGStatusKey)
klog.V(2).Infof("Removing NEG status annotation from service: %s/%s", namespace, name)
_, err = svcClient.Update(context2.TODO(), service, metav1.UpdateOptions{})
return err
return patch.PatchServiceObjectMetadata(coreClient, service, *newSvcObjectMeta)
}
// service doesn't have the expose NEG annotation and doesn't need update
return nil
Expand All @@ -652,14 +652,14 @@ func (c *Controller) syncNegStatusAnnotation(namespace, name string, portMap neg
if ok && existingAnnotation == annotation {
return nil
}
newSvcObjectMeta := service.ObjectMeta.DeepCopy()
// If enableCSM=true, it's possible a service having nil Annotations.
if service.Annotations == nil {
service.Annotations = make(map[string]string)
if newSvcObjectMeta.Annotations == nil {
newSvcObjectMeta.Annotations = make(map[string]string)
}
service.Annotations[annotations.NEGStatusKey] = annotation
newSvcObjectMeta.Annotations[annotations.NEGStatusKey] = annotation
klog.V(2).Infof("Updating NEG visibility annotation %q on service %s/%s.", annotation, namespace, name)
_, err = svcClient.Update(context2.TODO(), service, metav1.UpdateOptions{})
return err
return patch.PatchServiceObjectMetadata(coreClient, service, *newSvcObjectMeta)
}

// syncDestinationRuleNegStatusAnnotation syncs the destinationrule related neg status annotation
Expand Down Expand Up @@ -693,7 +693,7 @@ func (c *Controller) syncDestinationRuleNegStatusAnnotation(namespace, destinati
newDestinationRule := destinationRule.DeepCopy()
newDestinationRule.SetAnnotations(drAnnotations)
// Get the diff, we only need the Object meta diff.
patchBytes, err := utils.StrategicMergePatchBytes(destinationRule, newDestinationRule, struct {
patchBytes, err := patch.StrategicMergePatchBytes(destinationRule, newDestinationRule, struct {
metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
}{})
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/neg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/common"
"k8s.io/ingress-gce/pkg/utils/patch"
"k8s.io/klog"
utilpointer "k8s.io/utils/pointer"
)
Expand Down Expand Up @@ -576,7 +577,7 @@ func deleteSvcNegCR(svcNegClient svcnegclient.Interface, negCR *negv1beta1.Servi

// patchNegStatus patches the specified NegCR status with the provided new status
func patchNegStatus(svcNegClient svcnegclient.Interface, oldNeg, newNeg negv1beta1.ServiceNetworkEndpointGroup) (*negv1beta1.ServiceNetworkEndpointGroup, error) {
patchBytes, err := utils.MergePatchBytes(oldNeg, newNeg)
patchBytes, err := patch.MergePatchBytes(oldNeg, newNeg)
if err != nil {
return nil, fmt.Errorf("failed to prepare patch bytes: %s", err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/neg/readiness/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"k8s.io/client-go/tools/cache"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/ingress-gce/pkg/neg/types/shared"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/patch"
"k8s.io/klog"
)

Expand Down Expand Up @@ -113,7 +113,7 @@ func patchPodStatus(c clientset.Interface, namespace, name string, patchBytes []

// preparePatchBytesforPodStatus generates patch bytes based on the old and new pod status
func preparePatchBytesforPodStatus(oldPodStatus, newPodStatus v1.PodStatus) ([]byte, error) {
patchBytes, err := utils.StrategicMergePatchBytes(v1.Pod{Status: oldPodStatus}, v1.Pod{Status: newPodStatus}, v1.Pod{})
patchBytes, err := patch.StrategicMergePatchBytes(v1.Pod{Status: oldPodStatus}, v1.Pod{Status: newPodStatus}, v1.Pod{})
return patchBytes, err
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
"k8s.io/ingress-gce/pkg/neg/readiness"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/patch"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/apis/core"
)
Expand Down Expand Up @@ -546,7 +546,7 @@ func getNegFromStore(svcNegLister cache.Indexer, namespace, negName string) (*ne

// patchNegStatus patches the specified NegCR status with the provided new status
func patchNegStatus(svcNegClient svcnegclient.Interface, oldStatus, newStatus negv1beta1.ServiceNetworkEndpointGroupStatus, namespace, negName string) (*negv1beta1.ServiceNetworkEndpointGroup, error) {
patchBytes, err := utils.MergePatchBytes(negv1beta1.ServiceNetworkEndpointGroup{Status: oldStatus}, negv1beta1.ServiceNetworkEndpointGroup{Status: newStatus})
patchBytes, err := patch.MergePatchBytes(negv1beta1.ServiceNetworkEndpointGroup{Status: oldStatus}, negv1beta1.ServiceNetworkEndpointGroup{Status: newStatus})
if err != nil {
return nil, fmt.Errorf("failed to prepare patch bytes: %s", err)
}
Expand Down
48 changes: 11 additions & 37 deletions pkg/utils/common/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,14 @@ limitations under the License.
package common

import (
"context"
"encoding/json"
"fmt"

corev1 "k8s.io/api/core/v1"
"k8s.io/api/networking/v1beta1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/client-go/kubernetes"
coreclient "k8s.io/client-go/kubernetes/typed/core/v1"
client "k8s.io/client-go/kubernetes/typed/networking/v1beta1"
"k8s.io/ingress-gce/pkg/utils/patch"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/util/slice"
)
Expand Down Expand Up @@ -102,12 +98,12 @@ func EnsureServiceFinalizer(service *corev1.Service, key string, kubeClient kube
return nil
}

// Make a copy so we don't mutate the shared informer cache.
updated := service.DeepCopy()
updated.ObjectMeta.Finalizers = append(updated.ObjectMeta.Finalizers, key)
// Make a copy of object metadata so we don't mutate the shared informer cache.
updatedObjectMeta := service.ObjectMeta.DeepCopy()
updatedObjectMeta.Finalizers = append(updatedObjectMeta.Finalizers, key)

klog.V(2).Infof("Adding finalizer %s to service %s/%s", key, updated.Namespace, updated.Name)
return patchServiceFinalizer(kubeClient.CoreV1().Services(updated.Namespace), service, updated)
klog.V(2).Infof("Adding finalizer %s to service %s/%s", key, service.Namespace, service.Name)
return patch.PatchServiceObjectMetadata(kubeClient.CoreV1(), service, *updatedObjectMeta)
}

// removeFinalizer patches the service to remove finalizer.
Expand All @@ -116,32 +112,10 @@ func EnsureDeleteServiceFinalizer(service *corev1.Service, key string, kubeClien
return nil
}

// Make a copy so we don't mutate the shared informer cache.
updated := service.DeepCopy()
updated.ObjectMeta.Finalizers = slice.RemoveString(updated.ObjectMeta.Finalizers, key, nil)
// Make a copy of object metadata so we don't mutate the shared informer cache.
updatedObjectMeta := service.ObjectMeta.DeepCopy()
updatedObjectMeta.Finalizers = slice.RemoveString(updatedObjectMeta.Finalizers, key, nil)

klog.V(2).Infof("Removing finalizer from service %s/%s", updated.Namespace, updated.Name)
return patchServiceFinalizer(kubeClient.CoreV1().Services(updated.Namespace), service, updated)
}

func patchServiceFinalizer(sc coreclient.ServiceInterface, oldSvc, newSvc *corev1.Service) error {
svcKey := fmt.Sprintf("%s/%s", oldSvc.Namespace, oldSvc.Name)
oldData, err := json.Marshal(oldSvc)
if err != nil {
return fmt.Errorf("failed to Marshal oldData for service %s: %v", svcKey, err)
}

newData, err := json.Marshal(newSvc)
if err != nil {
return fmt.Errorf("failed to Marshal newData for service %s: %v", svcKey, err)
}

patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, corev1.Service{})
if err != nil {
return fmt.Errorf("failed to create TwoWayMergePatch for service %s: %v", svcKey, err)
}

klog.V(3).Infof("Patch bytes for service %s: %s", svcKey, patchBytes)
_, err = sc.Patch(context.TODO(), oldSvc.Name, types.StrategicMergePatchType, patchBytes, meta_v1.PatchOptions{}, "status")
return err
klog.V(2).Infof("Removing finalizer from service %s/%s", service.Namespace, service.Name)
return patch.PatchServiceObjectMetadata(kubeClient.CoreV1(), service, *updatedObjectMeta)
}
24 changes: 23 additions & 1 deletion pkg/utils/patch.go → pkg/utils/patch/patch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,18 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package utils
package patch

import (
"encoding/json"
"fmt"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/jsonmergepatch"
"k8s.io/apimachinery/pkg/util/strategicpatch"
coreclient "k8s.io/client-go/kubernetes/typed/core/v1"
svchelpers "k8s.io/cloud-provider/service/helpers"
)

// StrategicMergePatchBytes returns a patch between the old and new object using a strategic merge patch.
Expand Down Expand Up @@ -62,3 +66,21 @@ func MergePatchBytes(old, cur interface{}) ([]byte, error) {

return patchBytes, nil
}

// PatchServiceObjectMetadata patches the given service's metadata based on new
// service metadata.
func PatchServiceObjectMetadata(client coreclient.CoreV1Interface, svc *corev1.Service, newObjectMetadata metav1.ObjectMeta) error {
newSvc := svc.DeepCopy()
newSvc.ObjectMeta = newObjectMetadata
_, err := svchelpers.PatchService(client, svc, newSvc)
return err
}

// PatchServiceLoadBalancerStatus patches the given service's LoadBalancerStatus
// based on new service's load-balancer status.
func PatchServiceLoadBalancerStatus(client coreclient.CoreV1Interface, svc *corev1.Service, newStatus corev1.LoadBalancerStatus) error {
newSvc := svc.DeepCopy()
newSvc.Status.LoadBalancer = newStatus
_, err := svchelpers.PatchService(client, svc, newSvc)
return err
}
Loading

0 comments on commit 8ccedad

Please sign in to comment.