Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup egress* clientset code #1589

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions dist/yaml/.gitignore
Original file line number Diff line number Diff line change
@@ -1,9 +1 @@
k8s.ovn.org_egressfirewalls.yaml
k8s.ovn.org_egressips.yaml
ovnkube-master.yaml
ovn-setup.yaml
ovnkube-db.yaml
ovnkube-node.yaml
ovnkube-monitor.yaml
ovnkube-db-raft.yaml
ovs-node.yaml
*.yaml
8 changes: 4 additions & 4 deletions go-controller/cmd/ovndbchecker/ovndbchecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,17 +177,17 @@ func runOvnKubeDBChecker(ctx *cli.Context) error {
return fmt.Errorf("failed to initialize exec helper: %v", err)
}

kclient, egressIPClient, egressFirewallClient, _, err := util.NewClientsets(&config.Kubernetes)
ovnClientset, err := util.NewOVNClientset(&config.Kubernetes)
if err != nil {
return err
}

stopChan := make(chan struct{})
ovndbmanager.RunDBChecker(
&kube.Kube{
KClient: kclient,
EIPClient: egressIPClient,
EgressFirewallClient: egressFirewallClient,
KClient: ovnClientset.KubeClient,
EIPClient: ovnClientset.EgressIPClient,
EgressFirewallClient: ovnClientset.EgressFirewallClient,
},
stopChan)
// run until cancelled
Expand Down
12 changes: 6 additions & 6 deletions go-controller/cmd/ovnkube/ovnkube.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,13 @@ func runOvnKube(ctx *cli.Context) error {
return fmt.Errorf("failed to initialize exec helper: %v", err)
}

clientset, egressIPClientset, egressFirewallClientset, crdClientset, err := util.NewClientsets(&config.Kubernetes)
ovnClientset, err := util.NewOVNClientset(&config.Kubernetes)
if err != nil {
return err
}

// create factory and start the controllers asked for
factory, err := factory.NewWatchFactory(clientset, egressIPClientset, egressFirewallClientset, crdClientset)
factory, err := factory.NewWatchFactory(ovnClientset)
if err != nil {
return err
}
Expand Down Expand Up @@ -245,8 +245,8 @@ func runOvnKube(ctx *cli.Context) error {
// since we capture some metrics in Start()
metrics.RegisterMasterMetrics(ovnNBClient, ovnSBClient)

ovnController := ovn.NewOvnController(clientset, egressIPClientset, egressFirewallClientset, factory, stopChan, nil, ovnNBClient, ovnSBClient, util.EventRecorder(clientset))
if err := ovnController.Start(clientset, master, wg); err != nil {
ovnController := ovn.NewOvnController(ovnClientset, factory, stopChan, nil, ovnNBClient, ovnSBClient, util.EventRecorder(ovnClientset.KubeClient))
if err := ovnController.Start(ovnClientset.KubeClient, master, wg); err != nil {
return err
}
}
Expand All @@ -258,7 +258,7 @@ func runOvnKube(ctx *cli.Context) error {
// register ovnkube node specific prometheus metrics exported by the node
metrics.RegisterNodeMetrics()
start := time.Now()
n := ovnnode.NewNode(clientset, factory, node, stopChan, util.EventRecorder(clientset))
n := ovnnode.NewNode(ovnClientset.KubeClient, factory, node, stopChan, util.EventRecorder(ovnClientset.KubeClient))
if err := n.Start(wg); err != nil {
return err
}
Expand All @@ -274,7 +274,7 @@ func runOvnKube(ctx *cli.Context) error {

// start the prometheus server to serve OVN Metrics (default port: 9476)
if config.Kubernetes.OVNMetricsBindAddress != "" {
metrics.RegisterOvnMetrics(clientset, node)
metrics.RegisterOvnMetrics(ovnClientset.KubeClient, node)
metrics.StartOVNMetricsServer(config.Kubernetes.OVNMetricsBindAddress)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func runHybridOverlay(ctx *cli.Context) error {
return fmt.Errorf("missing node name; use the 'node' flag to provide one")
}

clientset, _, _, _, err := util.NewClientsets(&config.Kubernetes)
clientset, err := util.NewKubernetesClientset(&config.Kubernetes)
if err != nil {
return err
}
Expand Down
16 changes: 7 additions & 9 deletions go-controller/pkg/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/config"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/metrics"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util"

egressfirewallapi "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressfirewall/v1"
egressfirewallclientset "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressfirewall/v1/apis/clientset/versioned"
Expand All @@ -18,12 +19,10 @@ import (
egressfirewalllister "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressfirewall/v1/apis/listers/egressfirewall/v1"

egressipapi "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressip/v1"
egressipclientset "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressip/v1/apis/clientset/versioned"
egressipscheme "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressip/v1/apis/clientset/versioned/scheme"
egressipinformerfactory "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressip/v1/apis/informers/externalversions"
egressiplister "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressip/v1/apis/listers/egressip/v1"
apiextensionsapi "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apiextensionsscheme "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/scheme"
apiextensionsinformerfactory "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
apiextensionslister "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1beta1"
Expand All @@ -33,7 +32,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
informerfactory "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
Expand Down Expand Up @@ -460,18 +458,18 @@ var (
)

// NewWatchFactory initializes a new watch factory
func NewWatchFactory(c kubernetes.Interface, eip egressipclientset.Interface, ec egressfirewallclientset.Interface, crd apiextensionsclientset.Interface) (*WatchFactory, error) {
// resync time is 0, none of the resources being watched in ovn-kubernetes have
func NewWatchFactory(ovnClientset *util.OVNClientset) (*WatchFactory, error) {
// resync time is 12 hours, none of the resources being watched in ovn-kubernetes have
// any race condition where a resync may be required e.g. cni executable on node watching for
// events on pods and assuming that an 'ADD' event will contain the annotations put in by
// ovnkube master (currently, it is just a 'get' loop)
// the downside of making it tight (like 10 minutes) is needless spinning on all resources
// However, AddEventHandlerWithResyncPeriod can specify a per handler resync period
wf := &WatchFactory{
iFactory: informerfactory.NewSharedInformerFactory(c, resyncInterval),
eipFactory: egressipinformerfactory.NewSharedInformerFactory(eip, resyncInterval),
efClientset: ec,
crdFactory: apiextensionsinformerfactory.NewSharedInformerFactory(crd, resyncInterval),
iFactory: informerfactory.NewSharedInformerFactory(ovnClientset.KubeClient, resyncInterval),
eipFactory: egressipinformerfactory.NewSharedInformerFactory(ovnClientset.EgressIPClient, resyncInterval),
efClientset: ovnClientset.EgressFirewallClient,
alexanderConstantinescu marked this conversation as resolved.
Show resolved Hide resolved
crdFactory: apiextensionsinformerfactory.NewSharedInformerFactory(ovnClientset.APIExtensionsClient, resyncInterval),
informers: make(map[reflect.Type]*informer),
stopChan: make(chan struct{}),
}
Expand Down
47 changes: 28 additions & 19 deletions go-controller/pkg/factory/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

egressfirewall "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressfirewall/v1"
egressfirewallfake "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressfirewall/v1/apis/clientset/versioned/fake"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util"

apiextensions "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
apiextensionsfake "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake"
Expand Down Expand Up @@ -197,6 +198,7 @@ func (c *handlerCalls) getDeleted() int {

var _ = Describe("Watch Factory Operations", func() {
var (
ovnClientset *util.OVNClientset
fakeClient *fake.Clientset
egressIPFakeClient *egressipfake.Clientset
egressFirewallFakeClient *egressfirewallfake.Clientset
Expand Down Expand Up @@ -228,6 +230,13 @@ var _ = Describe("Watch Factory Operations", func() {
crdFakeClient = &apiextensionsfake.Clientset{}
egressIPFakeClient = &egressipfake.Clientset{}

ovnClientset = &util.OVNClientset{
KubeClient: fakeClient,
EgressIPClient: egressIPFakeClient,
EgressFirewallClient: egressFirewallFakeClient,
APIExtensionsClient: crdFakeClient,
}

pods = make([]*v1.Pod, 0)
podWatch = objSetup(fakeClient, "pods", func(core.Action) (bool, runtime.Object, error) {
obj := &v1.PodList{}
Expand Down Expand Up @@ -319,7 +328,7 @@ var _ = Describe("Watch Factory Operations", func() {

Context("when a processExisting is given", func() {
testExisting := func(objType reflect.Type, namespace string, sel labels.Selector) {
wf, err = NewWatchFactory(fakeClient, egressIPFakeClient, egressFirewallFakeClient, crdFakeClient)
wf, err = NewWatchFactory(ovnClientset)
Expect(err).NotTo(HaveOccurred())
err = wf.InitializeEgressFirewallWatchFactory()
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -394,7 +403,7 @@ var _ = Describe("Watch Factory Operations", func() {

Context("when existing items are known to the informer", func() {
testExisting := func(objType reflect.Type) {
wf, err = NewWatchFactory(fakeClient, egressIPFakeClient, egressFirewallFakeClient, crdFakeClient)
wf, err = NewWatchFactory(ovnClientset)
Expect(err).NotTo(HaveOccurred())
err = wf.InitializeEgressFirewallWatchFactory()
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -465,7 +474,7 @@ var _ = Describe("Watch Factory Operations", func() {

Context("when EgressIP is disabled", func() {
testExisting := func(objType reflect.Type) {
wf, err = NewWatchFactory(fakeClient, egressIPFakeClient, egressFirewallFakeClient, crdFakeClient)
wf, err = NewWatchFactory(ovnClientset)
Expect(err).NotTo(HaveOccurred())
Expect(wf.informers).NotTo(HaveKey(objType))
}
Expand Down Expand Up @@ -503,7 +512,7 @@ var _ = Describe("Watch Factory Operations", func() {
}

It("responds to pod add/update/delete events", func() {
wf, err = NewWatchFactory(fakeClient, egressIPFakeClient, egressFirewallFakeClient, crdFakeClient)
wf, err = NewWatchFactory(ovnClientset)
Expect(err).NotTo(HaveOccurred())

added := newPod("pod1", "default")
Expand Down Expand Up @@ -537,7 +546,7 @@ var _ = Describe("Watch Factory Operations", func() {
})

It("responds to multiple pod add/update/delete events", func() {
wf, err = NewWatchFactory(fakeClient, egressIPFakeClient, egressFirewallFakeClient, crdFakeClient)
wf, err = NewWatchFactory(ovnClientset)
Expect(err).NotTo(HaveOccurred())

const nodeName string = "mynode"
Expand Down Expand Up @@ -618,7 +627,7 @@ var _ = Describe("Watch Factory Operations", func() {
})

It("responds to namespace add/update/delete events", func() {
wf, err = NewWatchFactory(fakeClient, egressIPFakeClient, egressFirewallFakeClient, crdFakeClient)
wf, err = NewWatchFactory(ovnClientset)
Expect(err).NotTo(HaveOccurred())

added := newNamespace("default")
Expand Down Expand Up @@ -652,7 +661,7 @@ var _ = Describe("Watch Factory Operations", func() {
})

It("responds to node add/update/delete events", func() {
wf, err = NewWatchFactory(fakeClient, egressIPFakeClient, egressFirewallFakeClient, crdFakeClient)
wf, err = NewWatchFactory(ovnClientset)
Expect(err).NotTo(HaveOccurred())

added := newNode("mynode")
Expand Down Expand Up @@ -686,7 +695,7 @@ var _ = Describe("Watch Factory Operations", func() {
})

It("responds to multiple node add/update/delete events", func() {
wf, err = NewWatchFactory(fakeClient, egressIPFakeClient, egressFirewallFakeClient, crdFakeClient)
wf, err = NewWatchFactory(ovnClientset)
Expect(err).NotTo(HaveOccurred())

type opTest struct {
Expand Down Expand Up @@ -782,7 +791,7 @@ var _ = Describe("Watch Factory Operations", func() {
nodes = append(nodes, node)
}

wf, err = NewWatchFactory(fakeClient, egressIPFakeClient, egressFirewallFakeClient, crdFakeClient)
wf, err = NewWatchFactory(ovnClientset)
Expect(err).NotTo(HaveOccurred())

startWg := sync.WaitGroup{}
Expand Down Expand Up @@ -864,7 +873,7 @@ var _ = Describe("Watch Factory Operations", func() {
namespaces = append(namespaces, namespace)
}

wf, err = NewWatchFactory(fakeClient, egressIPFakeClient, egressFirewallFakeClient, crdFakeClient)
wf, err = NewWatchFactory(ovnClientset)
Expect(err).NotTo(HaveOccurred())

startWg := sync.WaitGroup{}
Expand Down Expand Up @@ -930,7 +939,7 @@ var _ = Describe("Watch Factory Operations", func() {
})

It("responds to policy add/update/delete events", func() {
wf, err = NewWatchFactory(fakeClient, egressIPFakeClient, egressFirewallFakeClient, crdFakeClient)
wf, err = NewWatchFactory(ovnClientset)
Expect(err).NotTo(HaveOccurred())

added := newPolicy("mypolicy", "default")
Expand Down Expand Up @@ -964,7 +973,7 @@ var _ = Describe("Watch Factory Operations", func() {
})

It("responds to endpoints add/update/delete events", func() {
wf, err = NewWatchFactory(fakeClient, egressIPFakeClient, egressFirewallFakeClient, crdFakeClient)
wf, err = NewWatchFactory(ovnClientset)
Expect(err).NotTo(HaveOccurred())

added := newEndpoints("myendpoints", "default")
Expand Down Expand Up @@ -1005,7 +1014,7 @@ var _ = Describe("Watch Factory Operations", func() {
})

It("responds to service add/update/delete events", func() {
wf, err = NewWatchFactory(fakeClient, egressIPFakeClient, egressFirewallFakeClient, crdFakeClient)
wf, err = NewWatchFactory(ovnClientset)
Expect(err).NotTo(HaveOccurred())

added := newService("myservice", "default")
Expand Down Expand Up @@ -1039,7 +1048,7 @@ var _ = Describe("Watch Factory Operations", func() {
})

It("responds to egressFirewall add/update/delete events", func() {
wf, err = NewWatchFactory(fakeClient, egressIPFakeClient, egressFirewallFakeClient, crdFakeClient)
wf, err = NewWatchFactory(ovnClientset)
err = wf.InitializeEgressFirewallWatchFactory()
Expect(err).NotTo(HaveOccurred())

Expand Down Expand Up @@ -1073,7 +1082,7 @@ var _ = Describe("Watch Factory Operations", func() {
wf.RemoveEgressFirewallHandler(h)
})
It("responds to crd add/update/delete events", func() {
wf, err = NewWatchFactory(fakeClient, egressIPFakeClient, egressFirewallFakeClient, crdFakeClient)
wf, err = NewWatchFactory(ovnClientset)
Expect(err).NotTo(HaveOccurred())

added := newCRD("crd1", "")
Expand Down Expand Up @@ -1107,7 +1116,7 @@ var _ = Describe("Watch Factory Operations", func() {

})
It("responds to egressIP add/update/delete events", func() {
wf, err = NewWatchFactory(fakeClient, egressIPFakeClient, egressFirewallFakeClient, crdFakeClient)
wf, err = NewWatchFactory(ovnClientset)
Expect(err).NotTo(HaveOccurred())

added := newEgressIP("myEgressIP", "default")
Expand Down Expand Up @@ -1140,7 +1149,7 @@ var _ = Describe("Watch Factory Operations", func() {
wf.RemoveEgressIPHandler(h)
})
It("stops processing events after the handler is removed", func() {
wf, err = NewWatchFactory(fakeClient, egressIPFakeClient, egressFirewallFakeClient, crdFakeClient)
wf, err = NewWatchFactory(ovnClientset)
Expect(err).NotTo(HaveOccurred())

added := newNamespace("default")
Expand Down Expand Up @@ -1169,7 +1178,7 @@ var _ = Describe("Watch Factory Operations", func() {
})

It("filters correctly by label and namespace", func() {
wf, err = NewWatchFactory(fakeClient, egressIPFakeClient, egressFirewallFakeClient, crdFakeClient)
wf, err = NewWatchFactory(ovnClientset)
Expect(err).NotTo(HaveOccurred())

passesFilter := newPod("pod1", "default")
Expand Down Expand Up @@ -1238,7 +1247,7 @@ var _ = Describe("Watch Factory Operations", func() {
})

It("correctly handles object updates that cause filter changes", func() {
wf, err = NewWatchFactory(fakeClient, egressIPFakeClient, egressFirewallFakeClient, crdFakeClient)
wf, err = NewWatchFactory(ovnClientset)
Expect(err).NotTo(HaveOccurred())

pod := newPod("pod1", "default")
Expand Down
4 changes: 2 additions & 2 deletions go-controller/pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func coverageShowMetricsUpdater(component string) {

// The `keepTrying` boolean when set to true will not return an error if we can't find pods with the given label.
// This is so that the caller can re-try again to see if the pods have appeared in the k8s cluster.
func checkPodRunsOnGivenNode(clientset *kubernetes.Clientset, label, k8sNodeName string,
func checkPodRunsOnGivenNode(clientset kubernetes.Interface, label, k8sNodeName string,
keepTrying bool) (bool, error) {
pods, err := clientset.CoreV1().Pods(config.Kubernetes.OVNConfigNamespace).List(context.TODO(), metav1.ListOptions{
LabelSelector: label,
Expand Down Expand Up @@ -204,7 +204,7 @@ func StartOVNMetricsServer(bindAddress string) {
}, 5*time.Second, utilwait.NeverStop)
}

func RegisterOvnMetrics(clientset *kubernetes.Clientset, k8sNodeName string) {
func RegisterOvnMetrics(clientset kubernetes.Interface, k8sNodeName string) {
go RegisterOvnDBMetrics(clientset, k8sNodeName)
go RegisterOvnControllerMetrics()
go RegisterOvnNorthdMetrics(clientset, k8sNodeName)
Expand Down
2 changes: 1 addition & 1 deletion go-controller/pkg/metrics/ovn_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func getOvnDbVersionInfo() {
}
}

func RegisterOvnDBMetrics(clientset *kubernetes.Clientset, k8sNodeName string) {
func RegisterOvnDBMetrics(clientset kubernetes.Interface, k8sNodeName string) {
err := wait.PollImmediate(1*time.Second, 300*time.Second, func() (bool, error) {
return checkPodRunsOnGivenNode(clientset, "name=ovnkube-db", k8sNodeName, false)
})
Expand Down
2 changes: 1 addition & 1 deletion go-controller/pkg/metrics/ovn_northd.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ var ovnNorthdCoverageShowMetricsMap = map[string]*metricDetails{
},
}

func RegisterOvnNorthdMetrics(clientset *kubernetes.Clientset, k8sNodeName string) {
func RegisterOvnNorthdMetrics(clientset kubernetes.Interface, k8sNodeName string) {
err := wait.PollImmediate(1*time.Second, 300*time.Second, func() (bool, error) {
return checkPodRunsOnGivenNode(clientset, "name=ovnkube-master", k8sNodeName, true)
})
Expand Down
Loading