Skip to content

Commit

Permalink
making ovn-kubernetes run without the egressFirewall CRD attacheced
Browse files Browse the repository at this point in the history
this will prevent the lister and informers from throwing errors. The
egressFirewallWatcher is dynamically created when the CRD is added

Signed-off-by: Jacob Tanenbaum <jtanenba@redhat.com>
  • Loading branch information
JacobTanenbaum committed Jul 30, 2020
1 parent 1f0893f commit e562133
Show file tree
Hide file tree
Showing 86 changed files with 30,787 additions and 57 deletions.
5 changes: 5 additions & 0 deletions dist/templates/ovn-setup.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ rules:
resources:
- egressfirewalls
verbs: ["list", "get", "watch", "update"]
- apiGroups:
- apiextensions.k8s.io
resources:
- customresourcedefinitions
verbs: ["list", "get", "watch"]


---
Expand Down
4 changes: 2 additions & 2 deletions go-controller/cmd/ovnkube/ovnkube.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,13 @@ func runOvnKube(ctx *cli.Context) error {
return fmt.Errorf("failed to initialize exec helper: %v", err)
}

clientset, egressFirewallClientset, err := util.NewClientsets(&config.Kubernetes)
clientset, egressFirewallClientset, crdClientset, err := util.NewClientsets(&config.Kubernetes)
if err != nil {
return err
}

// create factory and start the controllers asked for
factory, err := factory.NewWatchFactory(clientset, egressFirewallClientset)
factory, err := factory.NewWatchFactory(clientset, egressFirewallClientset, crdClientset)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion go-controller/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.0.0
gopkg.in/warnings.v0 v0.1.2 // indirect
k8s.io/api v0.18.6
k8s.io/apiextensions-apiserver v0.18.6 // indirect
k8s.io/apiextensions-apiserver v0.18.6
k8s.io/apimachinery v0.18.6
k8s.io/client-go v0.18.6
k8s.io/klog v1.0.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func runHybridOverlay(ctx *cli.Context) error {
return fmt.Errorf("missing node name; use the 'node' flag to provide one")
}

clientset, _, err := util.NewClientsets(&config.Kubernetes)
clientset, _, _, err := util.NewClientsets(&config.Kubernetes)
if err != nil {
return err
}
Expand Down
77 changes: 65 additions & 12 deletions go-controller/pkg/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ import (
egressfirewallinformerfactory "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressfirewall/v1/apis/informers/externalversions"
egressfirewalllister "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressfirewall/v1/apis/listers/egressfirewall/v1"

apiextensionsapi "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apiextensionsscheme "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/scheme"
apiextensionsinformerfactory "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
apiextensionslister "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1beta1"

kapi "k8s.io/api/core/v1"
knet "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -308,6 +314,8 @@ func newInformerLister(oType reflect.Type, sharedInformer cache.SharedIndexInfor
return nil, nil
case egressFirewallType:
return egressfirewalllister.NewEgressFirewallLister(sharedInformer.GetIndexer()), nil
case crdType:
return apiextensionslister.NewCustomResourceDefinitionLister(sharedInformer.GetIndexer()), nil
}

return nil, fmt.Errorf("cannot create lister from type %v", oType)
Expand Down Expand Up @@ -398,11 +406,14 @@ type WatchFactory struct {
// requirements with atomic accesses
handlerCounter uint64

iFactory informerfactory.SharedInformerFactory
efFactory egressfirewallinformerfactory.SharedInformerFactory
informers map[reflect.Type]*informer
iFactory informerfactory.SharedInformerFactory
efFactory egressfirewallinformerfactory.SharedInformerFactory
efClientset egressfirewallclientset.Interface
crdFactory apiextensionsinformerfactory.SharedInformerFactory
informers map[reflect.Type]*informer

stopChan chan struct{}
stopChan chan struct{}
egressFirewallStopChan chan struct{}
}

// ObjectCacheInterface represents the exported methods for getting
Expand Down Expand Up @@ -439,24 +450,26 @@ var (
namespaceType reflect.Type = reflect.TypeOf(&kapi.Namespace{})
nodeType reflect.Type = reflect.TypeOf(&kapi.Node{})
egressFirewallType reflect.Type = reflect.TypeOf(&egressfirewallapi.EgressFirewall{})
crdType reflect.Type = reflect.TypeOf(&apiextensionsapi.CustomResourceDefinition{})
)

// NewWatchFactory initializes a new watch factory
func NewWatchFactory(c kubernetes.Interface, ec egressfirewallclientset.Interface) (*WatchFactory, error) {
func NewWatchFactory(c kubernetes.Interface, ec egressfirewallclientset.Interface, crd apiextensionsclientset.Interface) (*WatchFactory, error) {
// resync time is 12 hours, none of the resources being watched in ovn-kubernetes have
// any race condition where a resync may be required e.g. cni executable on node watching for
// events on pods and assuming that an 'ADD' event will contain the annotations put in by
// ovnkube master (currently, it is just a 'get' loop)
// the downside of making it tight (like 10 minutes) is needless spinning on all resources
wf := &WatchFactory{
iFactory: informerfactory.NewSharedInformerFactory(c, resyncInterval),
efFactory: egressfirewallinformerfactory.NewSharedInformerFactory(ec, resyncInterval),
informers: make(map[reflect.Type]*informer),
stopChan: make(chan struct{}),
iFactory: informerfactory.NewSharedInformerFactory(c, resyncInterval),
efClientset: ec,
crdFactory: apiextensionsinformerfactory.NewSharedInformerFactory(crd, resyncInterval),
informers: make(map[reflect.Type]*informer),
stopChan: make(chan struct{}),
}
var err error

err = egressfirewallapi.AddToScheme(egressfirewallscheme.Scheme)
err = apiextensionsapi.AddToScheme(apiextensionsscheme.Scheme)
if err != nil {
return nil, err
}
Expand All @@ -482,15 +495,20 @@ func NewWatchFactory(c kubernetes.Interface, ec egressfirewallclientset.Interfac
if err != nil {
return nil, err
}
wf.informers[egressFirewallType], err = newInformer(egressFirewallType, wf.efFactory.K8s().V1().EgressFirewalls().Informer())
wf.informers[crdType], err = newInformer(crdType, wf.crdFactory.Apiextensions().V1beta1().CustomResourceDefinitions().Informer())
if err != nil {
return nil, err
}
wf.informers[nodeType], err = newQueuedInformer(nodeType, wf.iFactory.Core().V1().Nodes().Informer(), wf.stopChan)
if err != nil {
return nil, err
}
wf.efFactory.Start(wf.stopChan)
wf.crdFactory.Start(wf.stopChan)
for oType, synced := range wf.crdFactory.WaitForCacheSync(wf.stopChan) {
if !synced {
return nil, fmt.Errorf("error in syncing cache for %v informer", oType)
}
}

wf.iFactory.Start(wf.stopChan)
for oType, synced := range wf.iFactory.WaitForCacheSync(wf.stopChan) {
Expand All @@ -502,6 +520,31 @@ func NewWatchFactory(c kubernetes.Interface, ec egressfirewallclientset.Interfac
return wf, nil
}

func (wf *WatchFactory) InitializeEgressFirewallWatchFactory() error {
err := egressfirewallapi.AddToScheme(egressfirewallscheme.Scheme)
if err != nil {
return err
}
wf.efFactory = egressfirewallinformerfactory.NewSharedInformerFactory(wf.efClientset, resyncInterval)
wf.informers[egressFirewallType], err = newInformer(egressFirewallType, wf.efFactory.K8s().V1().EgressFirewalls().Informer())
if err != nil {
return err
}
wf.egressFirewallStopChan = make(chan struct{})
wf.efFactory.Start(wf.egressFirewallStopChan)
for oType, synced := range wf.efFactory.WaitForCacheSync(wf.egressFirewallStopChan) {
if !synced {
return fmt.Errorf("error in syncing cache for %v informer", oType)
}
}
return nil
}

func (wf *WatchFactory) ShutdownEgressFirewallWatchFactory() {
close(wf.egressFirewallStopChan)
wf.informers[egressFirewallType].shutdown()
}

func (wf *WatchFactory) Shutdown() {
close(wf.stopChan)

Expand Down Expand Up @@ -663,6 +706,16 @@ func (wf *WatchFactory) RemoveEgressFirewallHandler(handler *Handler) error {
return wf.removeHandler(egressFirewallType, handler)
}

// AddCRDHandler adds a handler function that will be executed on CRD obje changes
func (wf *WatchFactory) AddCRDHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (*Handler, error) {
return wf.addHandler(crdType, "", nil, handlerFuncs, processExisting)
}

// RemoveCRDHandler removes a CRD object event handler function
func (wf *WatchFactory) RemoveCRDHandler(handler *Handler) error {
return wf.removeHandler(crdType, handler)
}

// AddNamespaceHandler adds a handler function that will be executed on Namespace object changes
func (wf *WatchFactory) AddNamespaceHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (*Handler, error) {
return wf.addHandler(namespaceType, "", nil, handlerFuncs, processExisting)
Expand Down
Loading

0 comments on commit e562133

Please sign in to comment.