Skip to content

Commit

Permalink
syncer writes neg desc on initialization
Browse files Browse the repository at this point in the history
  - update initialized status on NEG CRs when neg is initialized
  - update synced condition after sync completes
  - expose errors during sync and initializations in NEG CR
  - set needInit to false if no errors during neg initialization
  • Loading branch information
swetharepakula committed Jul 28, 2020
1 parent 7f1bf5b commit ff8c894
Show file tree
Hide file tree
Showing 12 changed files with 1,187 additions and 17 deletions.
2 changes: 1 addition & 1 deletion pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func NewController(
if enableNegCrd {
svcNegInformer = ctx.SvcNegInformer.GetIndexer()
}
manager := newSyncerManager(namer, recorder, cloud, zoneGetter, ctx.SvcNegClient, ctx.PodInformer.GetIndexer(), ctx.ServiceInformer.GetIndexer(), ctx.EndpointInformer.GetIndexer(), ctx.NodeInformer.GetIndexer(), svcNegInformer)
manager := newSyncerManager(namer, recorder, cloud, zoneGetter, ctx.SvcNegClient, ctx.KubeSystemUID, ctx.PodInformer.GetIndexer(), ctx.ServiceInformer.GetIndexer(), ctx.EndpointInformer.GetIndexer(), ctx.NodeInformer.GetIndexer(), svcNegInformer)
var reflector readiness.Reflector
if enableReadinessReflector {
reflector = readiness.NewReadinessReflector(ctx, manager)
Expand Down
10 changes: 9 additions & 1 deletion pkg/neg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -77,9 +78,12 @@ type syncerManager struct {
reflector readiness.Reflector
//svcNegClient handles lifecycle operations for NEG CRs
svcNegClient svcnegclient.Interface

// kubeSystemUID is used to by syncers when NEG CRD is enabled
kubeSystemUID types.UID
}

func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer, recorder record.EventRecorder, cloud negtypes.NetworkEndpointGroupCloud, zoneGetter negtypes.ZoneGetter, svcNegClient svcnegclient.Interface, podLister, serviceLister, endpointLister, nodeLister, svcNegLister cache.Indexer) *syncerManager {
func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer, recorder record.EventRecorder, cloud negtypes.NetworkEndpointGroupCloud, zoneGetter negtypes.ZoneGetter, svcNegClient svcnegclient.Interface, kubeSystemUID types.UID, podLister, serviceLister, endpointLister, nodeLister, svcNegLister cache.Indexer) *syncerManager {
return &syncerManager{
namer: namer,
recorder: recorder,
Expand All @@ -93,6 +97,7 @@ func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer, recorder record.
svcPortMap: make(map[serviceKey]negtypes.PortInfoMap),
syncerMap: make(map[negtypes.NegSyncerKey]negtypes.NegSyncer),
svcNegClient: svcNegClient,
kubeSystemUID: kubeSystemUID,
}
}

Expand Down Expand Up @@ -149,8 +154,11 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg
manager.serviceLister,
manager.endpointLister,
manager.nodeLister,
manager.svcNegLister,
manager.reflector,
epc,
string(manager.kubeSystemUID),
manager.svcNegClient,
)
manager.syncerMap[syncerKey] = syncer
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/neg/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func NewTestSyncerManagerWithNegClient(kubeClient kubernetes.Interface, svcNegCl
ResyncPeriod: 0 * time.Second,
DefaultBackendSvcPort: defaultBackend,
}
context := context.NewControllerContext(nil, kubeClient, backendConfigClient, nil, svcNegClient, gce.NewFakeGCECloud(gce.DefaultTestClusterValues()), namer, "" /*kubeSystemUID*/, ctxConfig)
context := context.NewControllerContext(nil, kubeClient, backendConfigClient, nil, svcNegClient, gce.NewFakeGCECloud(gce.DefaultTestClusterValues()), namer, "kube-system-uid", ctxConfig)

var svcNegInformer cache.Indexer
if svcNegClient != nil {
Expand All @@ -99,6 +99,7 @@ func NewTestSyncerManagerWithNegClient(kubeClient kubernetes.Interface, svcNegCl
negtypes.NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network"),
negtypes.NewFakeZoneGetter(),
svcNegClient,
context.KubeSystemUID,
context.PodInformer.GetIndexer(),
context.ServiceInformer.GetIndexer(),
context.EndpointInformer.GetIndexer(),
Expand Down
1 change: 1 addition & 0 deletions pkg/neg/syncers/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
testServiceName = "test-name"
testNamedPort = "named-Port"
clusterID = "clusterid"
kubeSystemUID = "kube-system-id"
)

var (
Expand Down
176 changes: 172 additions & 4 deletions pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package syncers

import (
"context"
"sync"

"fmt"
Expand All @@ -25,14 +26,19 @@ import (

apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
negv1beta1 "k8s.io/ingress-gce/pkg/apis/svcneg/v1beta1"
"k8s.io/ingress-gce/pkg/composite"
"k8s.io/ingress-gce/pkg/neg/readiness"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/apis/core"
)

type transactionSyncer struct {
Expand All @@ -58,6 +64,7 @@ type transactionSyncer struct {
serviceLister cache.Indexer
endpointLister cache.Indexer
nodeLister cache.Indexer
svcNegLister cache.Indexer
recorder record.EventRecorder
cloud negtypes.NetworkEndpointGroupCloud
zoneGetter negtypes.ZoneGetter
Expand All @@ -68,9 +75,15 @@ type transactionSyncer struct {

// reflector handles NEG readiness gate and conditions for pods in NEG.
reflector readiness.Reflector

//kubeSystemUID used to populate Cluster UID on Neg Description when using NEG CRD
kubeSystemUID string

//svcNegClient used to update status on corresponding NEG CRs when not nil
svcNegClient svcnegclient.Interface
}

func NewTransactionSyncer(negSyncerKey negtypes.NegSyncerKey, networkEndpointGroupName string, recorder record.EventRecorder, cloud negtypes.NetworkEndpointGroupCloud, zoneGetter negtypes.ZoneGetter, podLister cache.Indexer, serviceLister cache.Indexer, endpointLister cache.Indexer, nodeLister cache.Indexer, reflector readiness.Reflector, epc negtypes.NetworkEndpointsCalculator) negtypes.NegSyncer {
func NewTransactionSyncer(negSyncerKey negtypes.NegSyncerKey, networkEndpointGroupName string, recorder record.EventRecorder, cloud negtypes.NetworkEndpointGroupCloud, zoneGetter negtypes.ZoneGetter, podLister cache.Indexer, serviceLister cache.Indexer, endpointLister cache.Indexer, nodeLister cache.Indexer, svcNegLister cache.Indexer, reflector readiness.Reflector, epc negtypes.NetworkEndpointsCalculator, kubeSystemUID string, svcNegClient svcnegclient.Interface) negtypes.NegSyncer {
// TransactionSyncer implements the syncer core
ts := &transactionSyncer{
NegSyncerKey: negSyncerKey,
Expand All @@ -81,11 +94,14 @@ func NewTransactionSyncer(negSyncerKey negtypes.NegSyncerKey, networkEndpointGro
podLister: podLister,
serviceLister: serviceLister,
endpointLister: endpointLister,
svcNegLister: svcNegLister,
recorder: recorder,
cloud: cloud,
zoneGetter: zoneGetter,
endpointsCalculator: epc,
reflector: reflector,
kubeSystemUID: kubeSystemUID,
svcNegClient: svcNegClient,
}
// Syncer implements life cycle logic
syncer := newSyncer(negSyncerKey, networkEndpointGroupName, serviceLister, recorder, ts)
Expand Down Expand Up @@ -121,10 +137,16 @@ func (s *transactionSyncer) sync() error {
func (s *transactionSyncer) syncInternal() error {
s.syncLock.Lock()
defer s.syncLock.Unlock()
// NOTE: Error will be used to update the status on corresponding Neg CR if Neg CRD is enabled
// Please reuse and set err before returning
var err error
defer s.updateStatus(err)

if s.needInit {
if err := s.ensureNetworkEndpointGroups(); err != nil {
return err
}
s.needInit = false
}

if s.syncer.IsStopped() || s.syncer.IsShuttingDown() {
Expand Down Expand Up @@ -164,7 +186,8 @@ func (s *transactionSyncer) syncInternal() error {

targetMap, endpointPodMap, err := s.endpointsCalculator.CalculateEndpoints(ep.(*apiv1.Endpoints), currentMap)
if err != nil {
return fmt.Errorf("endpoints calculation error in mode %q, err: %v", s.endpointsCalculator.Mode(), err)
err = fmt.Errorf("endpoints calculation error in mode %q, err: %v", s.endpointsCalculator.Mode(), err)
return err
}
s.logStats(targetMap, "desired NEG endpoints")

Expand All @@ -191,7 +214,10 @@ func (s *transactionSyncer) syncInternal() error {
}
s.logEndpoints(addEndpoints, "adding endpoint")
s.logEndpoints(removeEndpoints, "removing endpoint")
return s.syncNetworkEndpoints(addEndpoints, removeEndpoints)

// set err instead of returning directly so that synced condition on neg crd is properly updated in defer
err = s.syncNetworkEndpoints(addEndpoints, removeEndpoints)
return err
}

// ensureNetworkEndpointGroups ensures NEGs are created and configured correctly in the corresponding zones.
Expand All @@ -203,11 +229,33 @@ func (s *transactionSyncer) ensureNetworkEndpointGroups() error {
}

var errList []error
var negObjRefs []negv1beta1.NegObjectReference
for _, zone := range zones {
if err := ensureNetworkEndpointGroup(s.Namespace, s.Name, s.negName, zone, s.NegSyncerKey.String(), s.NegSyncerKey.NegType, s.cloud, s.serviceLister, s.recorder, s.NegSyncerKey.GetAPIVersion()); err != nil {
var negObj negv1beta1.NegObjectReference
negObj, err = ensureNetworkEndpointGroup(
s.Namespace,
s.Name,
s.negName,
zone,
s.NegSyncerKey.String(),
s.kubeSystemUID,
fmt.Sprint(s.NegSyncerKey.PortTuple.Port),
s.NegSyncerKey.NegType,
s.cloud,
s.serviceLister,
s.recorder,
s.NegSyncerKey.GetAPIVersion(),
)
if err != nil {
errList = append(errList, err)
}

if s.svcNegClient != nil && err == nil {
negObjRefs = append(negObjRefs, negObj)
}
}

s.updateInitStatus(negObjRefs, errList)
return utilerrors.NewAggregate(errList)
}

Expand Down Expand Up @@ -416,3 +464,123 @@ func (s *transactionSyncer) logStats(endpointMap map[string]negtypes.NetworkEndp
func (s *transactionSyncer) logEndpoints(endpointMap map[string]negtypes.NetworkEndpointSet, desc string) {
klog.V(3).Infof("For NEG %q, %s: %+v", s.negName, desc, endpointMap)
}

// updateInitStatus queries the k8s api server for the current NEG CR and updates the Initialized condition and neg objects as appropriate.
// If neg client is nil, will return immediately
func (s *transactionSyncer) updateInitStatus(negObjRefs []negv1beta1.NegObjectReference, errList []error) {
if s.svcNegClient == nil {
return
}

origNeg, err := getNegFromStore(s.svcNegLister, s.Namespace, s.negName)
if err != nil {
klog.Errorf("failed getting neg from store: %s", err)
return
}

neg := origNeg.DeepCopy()

if len(negObjRefs) != 0 {
neg.Status.NetworkEndpointGroups = negObjRefs
}

ensureCondition(neg, negv1beta1.Initialized, negtypes.NegInitializationSuccessful, negtypes.NegInitializationFailed, metav1.Now(), utilerrors.NewAggregate(errList))

_, err = patchNegStatus(s.svcNegClient, origNeg.Status, neg.Status, s.Namespace, s.negName)
if err != nil {
klog.Errorf("Error updating Neg CR %s : %s", s.negName, err)
}
}

// updateStatus will update the Synced condition as needed on the corresponding neg cr. If the Initialized condition or NetworkEndpointGroups are missing, needInit will be set to true. LastSyncTime will be updated as well.
func (s *transactionSyncer) updateStatus(syncErr error) {
if s.svcNegClient == nil {
return
}
origNeg, err := getNegFromStore(s.svcNegLister, s.Namespace, s.negName)
if err != nil {
klog.Errorf("failed getting neg from store: %s", err)
return
}
neg := origNeg.DeepCopy()

ts := metav1.Now()
if _, _, _, exists := findCondition(neg.Status.Conditions, negv1beta1.Initialized); !exists {
s.needInit = true
}

ensureCondition(neg, negv1beta1.Synced, negtypes.NegSyncSuccessful, negtypes.NegSyncFailed, ts, syncErr)
neg.Status.LastSyncTime = ts

if len(neg.Status.NetworkEndpointGroups) == 0 {
s.needInit = true
}

_, err = patchNegStatus(s.svcNegClient, origNeg.Status, neg.Status, s.Namespace, s.negName)
if err != nil {
klog.Errorf("Error updating Neg CR %s : %s", s.negName, err)
}
}

// getNegFromStore returns the neg associated with the provided namespace and neg name if it exists otherwise throws an error
func getNegFromStore(svcNegLister cache.Indexer, namespace, negName string) (*negv1beta1.ServiceNetworkEndpointGroup, error) {
n, exists, err := svcNegLister.GetByKey(fmt.Sprintf("%s/%s", namespace, negName))
if err != nil {
return nil, fmt.Errorf("Error getting neg %s/%s from cache: %s", namespace, negName, err)
}
if !exists {
return nil, fmt.Errorf("neg %s/%s is not in store", namespace, negName)
}

return n.(*negv1beta1.ServiceNetworkEndpointGroup), nil
}

// patchNegStatus patches the specified NegCR status with the provided new status
func patchNegStatus(svcNegClient svcnegclient.Interface, oldStatus, newStatus negv1beta1.ServiceNetworkEndpointGroupStatus, namespace, negName string) (*negv1beta1.ServiceNetworkEndpointGroup, error) {
patchBytes, err := utils.MergePatchBytes(negv1beta1.ServiceNetworkEndpointGroup{Status: oldStatus}, negv1beta1.ServiceNetworkEndpointGroup{Status: newStatus})
if err != nil {
return nil, fmt.Errorf("failed to prepare patch bytes: %s", err)
}

return svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(namespace).Patch(context.Background(), negName, types.MergePatchType, patchBytes, metav1.PatchOptions{})
}

// ensureCondition makes sure that a condition of conditionType exists and
// is accurate or will update it based on provided error
func ensureCondition(neg *negv1beta1.ServiceNetworkEndpointGroup, conditionType, successReason, failureReason string, ts metav1.Time, err error) {

condition, index, conditions, _ := findCondition(neg.Status.Conditions, conditionType)
if condition.Status == core.ConditionTrue && err == nil {
neg.Status.Conditions = conditions
return
}

condition.LastTransitionTime = ts
if err != nil {
condition.Status = core.ConditionUnknown
condition.Reason = failureReason
condition.Message = err.Error()
} else {
condition.Status = core.ConditionTrue
condition.Reason = successReason
condition.Message = ""
}
conditions[index] = condition
neg.Status.Conditions = conditions
}

// findCondition finds a condition in the given list of conditions that has the type conditionType.
// If no condition is found, one will be created with that type and appended to the list.
// The condition, the index the condition exists in the returned condition list, and whether the condition exists in the provided list
func findCondition(conditions []negv1beta1.Condition, conditionType string) (negv1beta1.Condition, int, []negv1beta1.Condition, bool) {

for i, c := range conditions {
if c.Type == conditionType {
return c, i, conditions, true
}
}

conditions = append(conditions, negv1beta1.Condition{Type: conditionType})
index := len(conditions) - 1
return conditions[index], index, conditions, false
}
Loading

0 comments on commit ff8c894

Please sign in to comment.