Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Patch instead of Update for k8s Service client #1127

Merged
merged 2 commits into from
Oct 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/cmconfig/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/patch"
"k8s.io/klog"
)

Expand Down Expand Up @@ -65,7 +65,7 @@ func (c *ConfigMapConfigController) GetConfig() Config {
}

func (c *ConfigMapConfigController) updateASMReady(status string) {
patchBytes, err := utils.StrategicMergePatchBytes(v1.ConfigMap{Data: map[string]string{}},
patchBytes, err := patch.StrategicMergePatchBytes(v1.ConfigMap{Data: map[string]string{}},
v1.ConfigMap{Data: map[string]string{asmReady: status}}, v1.ConfigMap{})
if err != nil {
c.RecordEvent("Warning", "FailedToUpdateASMStatus", fmt.Sprintf("Failed to update ASM Status, failed to create patch for ASM ConfigMap, error: %s", err))
Expand Down
4 changes: 2 additions & 2 deletions pkg/experimental/workload/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
workloadv1a1 "k8s.io/ingress-gce/pkg/experimental/apis/workload/v1alpha1"
workloadclient "k8s.io/ingress-gce/pkg/experimental/workload/client/clientset/versioned"
daemonutils "k8s.io/ingress-gce/pkg/experimental/workload/daemon/utils"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/patch"
"k8s.io/klog"
)

Expand Down Expand Up @@ -177,7 +177,7 @@ func OutputCredentials(credentials daemonutils.ClusterCredentials) {

// preparePatchBytesforWorkloadStatus generates patch bytes based on the old and new workload status
func preparePatchBytesforWorkloadStatus(oldStatus, newStatus workloadv1a1.WorkloadStatus) ([]byte, error) {
patchBytes, err := utils.StrategicMergePatchBytes(
patchBytes, err := patch.StrategicMergePatchBytes(
workloadv1a1.Workload{Status: oldStatus},
workloadv1a1.Workload{Status: newStatus},
workloadv1a1.Workload{},
Expand Down
39 changes: 9 additions & 30 deletions pkg/l4/l4controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,16 @@ limitations under the License.
package l4

import (
context2 "context"
"fmt"
"reflect"
"sync"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/cloud-provider/service/helpers"
"k8s.io/ingress-gce/pkg/annotations"
"k8s.io/ingress-gce/pkg/backends"
"k8s.io/ingress-gce/pkg/context"
Expand All @@ -40,6 +37,7 @@ import (
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/common"
"k8s.io/ingress-gce/pkg/utils/namer"
"k8s.io/ingress-gce/pkg/utils/patch"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/apis/core/v1/helper"
)
Expand Down Expand Up @@ -186,7 +184,7 @@ func (l4c *L4Controller) processServiceCreateOrUpdate(key string, service *v1.Se
}
l4c.ctx.Recorder(service.Namespace).Eventf(service, v1.EventTypeNormal, "SyncLoadBalancerSuccessful",
"Successfully ensured load balancer resources")
if err = l4c.updateAnnotations(service.Name, service.Namespace, annotationsMap); err != nil {
if err = l4c.updateAnnotations(service, annotationsMap); err != nil {
l4c.ctx.Recorder(service.Namespace).Eventf(service, v1.EventTypeWarning, "SyncLoadBalancerFailed",
"Failed to update annotations for load balancer, err: %v", err)
return fmt.Errorf("failed to set resource annotations, err: %v", err)
Expand All @@ -202,7 +200,7 @@ func (l4c *L4Controller) processServiceDeletion(key string, svc *v1.Service) err
return err
}
// Also remove any ILB annotations from the service metadata
if err := l4c.updateAnnotations(svc.Name, svc.Namespace, nil); err != nil {
if err := l4c.updateAnnotations(svc, nil); err != nil {
l4c.ctx.Recorder(svc.Namespace).Eventf(svc, v1.EventTypeWarning, "DeleteLoadBalancer",
"Error resetting resource annotations for load balancer: %v", err)
return fmt.Errorf("failed to reset resource annotations, err: %v", err)
Expand Down Expand Up @@ -272,36 +270,17 @@ func (l4c *L4Controller) updateServiceStatus(svc *v1.Service, newStatus *v1.Load
if helper.LoadBalancerStatusEqual(&svc.Status.LoadBalancer, newStatus) {
return nil
}
updated := svc.DeepCopy()
updated.Status.LoadBalancer = *newStatus
if _, err := helpers.PatchService(l4c.ctx.KubeClient.CoreV1(), svc, updated); err != nil {
return err
}
return nil
return patch.PatchServiceLoadBalancerStatus(l4c.ctx.KubeClient.CoreV1(), svc, *newStatus)
}

func (l4c *L4Controller) updateServiceMetadata(svc *v1.Service, newObjectMetadata metav1.ObjectMeta) error {
updated := svc.DeepCopy()
updated.ObjectMeta = newObjectMetadata
if _, err := helpers.PatchService(l4c.ctx.KubeClient.CoreV1(), svc, updated); err != nil {
return err
}
return nil
}

func (l4c *L4Controller) updateAnnotations(name, namespace string, newILBAnnotations map[string]string) error {
svcClient := l4c.ctx.KubeClient.CoreV1().Services(namespace)
currSvc, err := svcClient.Get(context2.TODO(), name, metav1.GetOptions{})
if err != nil {
return err
}
newObjectMeta := currSvc.ObjectMeta.DeepCopy()
func (l4c *L4Controller) updateAnnotations(svc *v1.Service, newILBAnnotations map[string]string) error {
newObjectMeta := svc.ObjectMeta.DeepCopy()
newObjectMeta.Annotations = mergeAnnotations(newObjectMeta.Annotations, newILBAnnotations)
if reflect.DeepEqual(currSvc.Annotations, newObjectMeta.Annotations) {
if reflect.DeepEqual(svc.Annotations, newObjectMeta.Annotations) {
return nil
}
klog.V(3).Infof("Updating annotations of service %v/%v", namespace, name)
return l4c.updateServiceMetadata(currSvc, *newObjectMeta)
klog.V(3).Infof("Patching annotations of service %v/%v", svc.Namespace, svc.Name)
return patch.PatchServiceObjectMetadata(l4c.ctx.KubeClient.CoreV1(), svc, *newObjectMeta)
}

// mergeAnnotations merges the new set of ilb resource annotations with the pre-existing service annotations.
Expand Down
24 changes: 12 additions & 12 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/common"
namer2 "k8s.io/ingress-gce/pkg/utils/namer"
"k8s.io/ingress-gce/pkg/utils/patch"
"k8s.io/klog"
)

Expand Down Expand Up @@ -624,20 +625,19 @@ func (c *Controller) syncNegStatusAnnotation(namespace, name string, portMap neg
if err != nil {
return err
}
svcClient := c.client.CoreV1().Services(namespace)
service, err := svcClient.Get(context2.TODO(), name, metav1.GetOptions{})
coreClient := c.client.CoreV1()
service, err := coreClient.Services(namespace).Get(context2.TODO(), name, metav1.GetOptions{})
if err != nil {
return err
}

// Remove NEG Status Annotation when no NEG is needed
if len(portMap) == 0 {
if _, ok := service.Annotations[annotations.NEGStatusKey]; ok {
// TODO: use PATCH to remove annotation
delete(service.Annotations, annotations.NEGStatusKey)
newSvcObjectMeta := service.ObjectMeta.DeepCopy()
delete(newSvcObjectMeta.Annotations, annotations.NEGStatusKey)
klog.V(2).Infof("Removing NEG status annotation from service: %s/%s", namespace, name)
_, err = svcClient.Update(context2.TODO(), service, metav1.UpdateOptions{})
return err
return patch.PatchServiceObjectMetadata(coreClient, service, *newSvcObjectMeta)
}
// service doesn't have the expose NEG annotation and doesn't need update
return nil
Expand All @@ -652,14 +652,14 @@ func (c *Controller) syncNegStatusAnnotation(namespace, name string, portMap neg
if ok && existingAnnotation == annotation {
return nil
}
newSvcObjectMeta := service.ObjectMeta.DeepCopy()
// If enableCSM=true, it's possible a service having nil Annotations.
if service.Annotations == nil {
service.Annotations = make(map[string]string)
if newSvcObjectMeta.Annotations == nil {
newSvcObjectMeta.Annotations = make(map[string]string)
}
service.Annotations[annotations.NEGStatusKey] = annotation
newSvcObjectMeta.Annotations[annotations.NEGStatusKey] = annotation
klog.V(2).Infof("Updating NEG visibility annotation %q on service %s/%s.", annotation, namespace, name)
_, err = svcClient.Update(context2.TODO(), service, metav1.UpdateOptions{})
return err
return patch.PatchServiceObjectMetadata(coreClient, service, *newSvcObjectMeta)
}

// syncDestinationRuleNegStatusAnnotation syncs the destinationrule related neg status annotation
Expand Down Expand Up @@ -693,7 +693,7 @@ func (c *Controller) syncDestinationRuleNegStatusAnnotation(namespace, destinati
newDestinationRule := destinationRule.DeepCopy()
newDestinationRule.SetAnnotations(drAnnotations)
// Get the diff, we only need the Object meta diff.
patchBytes, err := utils.StrategicMergePatchBytes(destinationRule, newDestinationRule, struct {
patchBytes, err := patch.StrategicMergePatchBytes(destinationRule, newDestinationRule, struct {
metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
}{})
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/neg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/common"
"k8s.io/ingress-gce/pkg/utils/patch"
"k8s.io/klog"
utilpointer "k8s.io/utils/pointer"
)
Expand Down Expand Up @@ -576,7 +577,7 @@ func deleteSvcNegCR(svcNegClient svcnegclient.Interface, negCR *negv1beta1.Servi

// patchNegStatus patches the specified NegCR status with the provided new status
func patchNegStatus(svcNegClient svcnegclient.Interface, oldNeg, newNeg negv1beta1.ServiceNetworkEndpointGroup) (*negv1beta1.ServiceNetworkEndpointGroup, error) {
patchBytes, err := utils.MergePatchBytes(oldNeg, newNeg)
patchBytes, err := patch.MergePatchBytes(oldNeg, newNeg)
if err != nil {
return nil, fmt.Errorf("failed to prepare patch bytes: %s", err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/neg/readiness/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"k8s.io/client-go/tools/cache"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/ingress-gce/pkg/neg/types/shared"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/patch"
"k8s.io/klog"
)

Expand Down Expand Up @@ -113,7 +113,7 @@ func patchPodStatus(c clientset.Interface, namespace, name string, patchBytes []

// preparePatchBytesforPodStatus generates patch bytes based on the old and new pod status
func preparePatchBytesforPodStatus(oldPodStatus, newPodStatus v1.PodStatus) ([]byte, error) {
patchBytes, err := utils.StrategicMergePatchBytes(v1.Pod{Status: oldPodStatus}, v1.Pod{Status: newPodStatus}, v1.Pod{})
patchBytes, err := patch.StrategicMergePatchBytes(v1.Pod{Status: oldPodStatus}, v1.Pod{Status: newPodStatus}, v1.Pod{})
return patchBytes, err
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
"k8s.io/ingress-gce/pkg/neg/readiness"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/patch"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/apis/core"
)
Expand Down Expand Up @@ -546,7 +546,7 @@ func getNegFromStore(svcNegLister cache.Indexer, namespace, negName string) (*ne

// patchNegStatus patches the specified NegCR status with the provided new status
func patchNegStatus(svcNegClient svcnegclient.Interface, oldStatus, newStatus negv1beta1.ServiceNetworkEndpointGroupStatus, namespace, negName string) (*negv1beta1.ServiceNetworkEndpointGroup, error) {
patchBytes, err := utils.MergePatchBytes(negv1beta1.ServiceNetworkEndpointGroup{Status: oldStatus}, negv1beta1.ServiceNetworkEndpointGroup{Status: newStatus})
patchBytes, err := patch.MergePatchBytes(negv1beta1.ServiceNetworkEndpointGroup{Status: oldStatus}, negv1beta1.ServiceNetworkEndpointGroup{Status: newStatus})
if err != nil {
return nil, fmt.Errorf("failed to prepare patch bytes: %s", err)
}
Expand Down
48 changes: 11 additions & 37 deletions pkg/utils/common/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,14 @@ limitations under the License.
package common

import (
"context"
"encoding/json"
"fmt"

corev1 "k8s.io/api/core/v1"
"k8s.io/api/networking/v1beta1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/client-go/kubernetes"
coreclient "k8s.io/client-go/kubernetes/typed/core/v1"
client "k8s.io/client-go/kubernetes/typed/networking/v1beta1"
"k8s.io/ingress-gce/pkg/utils/patch"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/util/slice"
)
Expand Down Expand Up @@ -102,12 +98,12 @@ func EnsureServiceFinalizer(service *corev1.Service, key string, kubeClient kube
return nil
}

// Make a copy so we don't mutate the shared informer cache.
updated := service.DeepCopy()
updated.ObjectMeta.Finalizers = append(updated.ObjectMeta.Finalizers, key)
// Make a copy of object metadata so we don't mutate the shared informer cache.
updatedObjectMeta := service.ObjectMeta.DeepCopy()
updatedObjectMeta.Finalizers = append(updatedObjectMeta.Finalizers, key)

klog.V(2).Infof("Adding finalizer %s to service %s/%s", key, updated.Namespace, updated.Name)
return patchServiceFinalizer(kubeClient.CoreV1().Services(updated.Namespace), service, updated)
klog.V(2).Infof("Adding finalizer %s to service %s/%s", key, service.Namespace, service.Name)
return patch.PatchServiceObjectMetadata(kubeClient.CoreV1(), service, *updatedObjectMeta)
}

// removeFinalizer patches the service to remove finalizer.
Expand All @@ -116,32 +112,10 @@ func EnsureDeleteServiceFinalizer(service *corev1.Service, key string, kubeClien
return nil
}

// Make a copy so we don't mutate the shared informer cache.
updated := service.DeepCopy()
updated.ObjectMeta.Finalizers = slice.RemoveString(updated.ObjectMeta.Finalizers, key, nil)
// Make a copy of object metadata so we don't mutate the shared informer cache.
updatedObjectMeta := service.ObjectMeta.DeepCopy()
updatedObjectMeta.Finalizers = slice.RemoveString(updatedObjectMeta.Finalizers, key, nil)

klog.V(2).Infof("Removing finalizer from service %s/%s", updated.Namespace, updated.Name)
return patchServiceFinalizer(kubeClient.CoreV1().Services(updated.Namespace), service, updated)
}

func patchServiceFinalizer(sc coreclient.ServiceInterface, oldSvc, newSvc *corev1.Service) error {
svcKey := fmt.Sprintf("%s/%s", oldSvc.Namespace, oldSvc.Name)
oldData, err := json.Marshal(oldSvc)
if err != nil {
return fmt.Errorf("failed to Marshal oldData for service %s: %v", svcKey, err)
}

newData, err := json.Marshal(newSvc)
if err != nil {
return fmt.Errorf("failed to Marshal newData for service %s: %v", svcKey, err)
}

patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, corev1.Service{})
if err != nil {
return fmt.Errorf("failed to create TwoWayMergePatch for service %s: %v", svcKey, err)
}

klog.V(3).Infof("Patch bytes for service %s: %s", svcKey, patchBytes)
_, err = sc.Patch(context.TODO(), oldSvc.Name, types.StrategicMergePatchType, patchBytes, meta_v1.PatchOptions{}, "status")
return err
klog.V(2).Infof("Removing finalizer from service %s/%s", service.Namespace, service.Name)
return patch.PatchServiceObjectMetadata(kubeClient.CoreV1(), service, *updatedObjectMeta)
}
24 changes: 23 additions & 1 deletion pkg/utils/patch.go → pkg/utils/patch/patch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,18 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package utils
package patch

import (
"encoding/json"
"fmt"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/jsonmergepatch"
"k8s.io/apimachinery/pkg/util/strategicpatch"
coreclient "k8s.io/client-go/kubernetes/typed/core/v1"
svchelpers "k8s.io/cloud-provider/service/helpers"
)

// StrategicMergePatchBytes returns a patch between the old and new object using a strategic merge patch.
Expand Down Expand Up @@ -62,3 +66,21 @@ func MergePatchBytes(old, cur interface{}) ([]byte, error) {

return patchBytes, nil
}

// PatchServiceObjectMetadata patches the given service's metadata based on new
// service metadata.
func PatchServiceObjectMetadata(client coreclient.CoreV1Interface, svc *corev1.Service, newObjectMetadata metav1.ObjectMeta) error {
newSvc := svc.DeepCopy()
newSvc.ObjectMeta = newObjectMetadata
_, err := svchelpers.PatchService(client, svc, newSvc)
return err
}

// PatchServiceLoadBalancerStatus patches the given service's LoadBalancerStatus
// based on new service's load-balancer status.
func PatchServiceLoadBalancerStatus(client coreclient.CoreV1Interface, svc *corev1.Service, newStatus corev1.LoadBalancerStatus) error {
newSvc := svc.DeepCopy()
newSvc.Status.LoadBalancer = newStatus
_, err := svchelpers.PatchService(client, svc, newSvc)
return err
}
Loading