diff --git a/go-controller/cmd/ovnkube/ovnkube.go b/go-controller/cmd/ovnkube/ovnkube.go index 9b872f9e9a9..d72390c775e 100644 --- a/go-controller/cmd/ovnkube/ovnkube.go +++ b/go-controller/cmd/ovnkube/ovnkube.go @@ -188,13 +188,13 @@ func runOvnKube(ctx *cli.Context) error { return fmt.Errorf("failed to initialize exec helper: %v", err) } - clientset, err := util.NewClientset(&config.Kubernetes) + clientset, egressFirewallClientset, err := util.NewClientset(&config.Kubernetes) if err != nil { return err } // create factory and start the controllers asked for - factory, err := factory.NewWatchFactory(clientset) + factory, err := factory.NewWatchFactory(clientset, egressFirewallClientset) if err != nil { return err } diff --git a/go-controller/go.sum b/go-controller/go.sum index 5d160844ef9..4c4aa0eed83 100644 --- a/go-controller/go.sum +++ b/go-controller/go.sum @@ -200,6 +200,7 @@ github.com/onsi/gomega v1.8.1/go.mod h1:Ho0h+IUsWyvy1OpqCwxlQ/21gkhVunqlU8fDGcoT github.com/opencontainers/go-digest v0.0.0-20180430190053-c9281466c8b2/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s= github.com/opencontainers/runc v0.0.0-20190115041553-12f6a991201f/go.mod h1:qT5XzbpPznkRYVz/mWwUaVBUv2rmF59PVA73FjuZG0U= github.com/opencontainers/runtime-spec v1.0.2/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= +github.com/ovn-org/ovn-kubernetes v0.3.11 h1:xlSdIlvbiz/61WLKER8ovEYwvGAXpUlsneCppHYogBw= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= @@ -400,6 +401,7 @@ k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8= k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I= k8s.io/kube-openapi v0.0.0-20191107075043-30be4d16710a h1:UcxjrRMyNx/i/y8G7kPvLyy7rfbeuf1PYyBf973pgyU= k8s.io/kube-openapi v0.0.0-20191107075043-30be4d16710a/go.mod h1:1TqjTSzOxsLGIKfj0lK8EeCP7K1iUG65v09OM0/WG5E= +k8s.io/kubernetes v1.13.0/go.mod h1:ocZa8+6APFNC2tX1DZASIbocyYT5jHzqFVsY5aoB7Jk= k8s.io/utils v0.0.0-20191114184206-e782cd3c129f h1:GiPwtSzdP43eI1hpPCbROQCCIgCuiMMNF8YUVLF3vJo= k8s.io/utils v0.0.0-20191114184206-e782cd3c129f/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew= sigs.k8s.io/structured-merge-diff v0.0.0-20190525122527-15d366b2352e/go.mod h1:wWxsB5ozmmv/SG7nM11ayaAW51xMvak/t1r0CSlcokI= diff --git a/go-controller/hybrid-overlay/cmd/hybrid-overlay-node/hybrid-overlay-node.go b/go-controller/hybrid-overlay/cmd/hybrid-overlay-node/hybrid-overlay-node.go index 2333d8a8740..9ef2895b0ba 100644 --- a/go-controller/hybrid-overlay/cmd/hybrid-overlay-node/hybrid-overlay-node.go +++ b/go-controller/hybrid-overlay/cmd/hybrid-overlay-node/hybrid-overlay-node.go @@ -83,7 +83,7 @@ func runHybridOverlay(ctx *cli.Context) error { return fmt.Errorf("missing node name; use the 'node' flag to provide one") } - clientset, err := util.NewClientset(&config.Kubernetes) + clientset, _, err := util.NewClientset(&config.Kubernetes) if err != nil { return err } diff --git a/go-controller/pkg/factory/factory.go b/go-controller/pkg/factory/factory.go index 104ad0fc599..30867d3b129 100644 --- a/go-controller/pkg/factory/factory.go +++ b/go-controller/pkg/factory/factory.go @@ -10,6 +10,12 @@ import ( "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/metrics" + egressfirewallapi "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressfirewall/v1" + egressfirewallclientset "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressfirewall/v1/apis/clientset/versioned" + egressfirewallscheme "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressfirewall/v1/apis/clientset/versioned/scheme" + egressfirewallinformerfactory "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressfirewall/v1/apis/informers/externalversions" + egressfirewalllister "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressfirewall/v1/apis/listers/egressfirewall/v1" + kapi "k8s.io/api/core/v1" knet "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -111,6 +117,7 @@ func (i *informer) forEachHandler(obj interface{}, f func(h *Handler)) { } func (i *informer) addHandler(id uint64, filterFunc func(obj interface{}) bool, funcs cache.ResourceEventHandler, existingItems []interface{}) *Handler { + handler := &Handler{ cache.FilteringResourceEventHandler{ FilterFunc: filterFunc, @@ -300,6 +307,8 @@ func newInformerLister(oType reflect.Type, sharedInformer cache.SharedIndexInfor return listers.NewNodeLister(sharedInformer.GetIndexer()), nil case policyType: return nil, nil + case egressFirewallType: + return egressfirewalllister.NewEgressFirewallLister(sharedInformer.GetIndexer()), nil } return nil, fmt.Errorf("cannot create lister from type %v", oType) @@ -391,6 +400,7 @@ type WatchFactory struct { handlerCounter uint64 iFactory informerfactory.SharedInformerFactory + efFactory egressfirewallinformerfactory.SharedInformerFactory informers map[reflect.Type]*informer stopChan chan struct{} @@ -423,16 +433,17 @@ const ( ) var ( - podType reflect.Type = reflect.TypeOf(&kapi.Pod{}) - serviceType reflect.Type = reflect.TypeOf(&kapi.Service{}) - endpointsType reflect.Type = reflect.TypeOf(&kapi.Endpoints{}) - policyType reflect.Type = reflect.TypeOf(&knet.NetworkPolicy{}) - namespaceType reflect.Type = reflect.TypeOf(&kapi.Namespace{}) - nodeType reflect.Type = reflect.TypeOf(&kapi.Node{}) + podType reflect.Type = reflect.TypeOf(&kapi.Pod{}) + serviceType reflect.Type = reflect.TypeOf(&kapi.Service{}) + endpointsType reflect.Type = reflect.TypeOf(&kapi.Endpoints{}) + policyType reflect.Type = reflect.TypeOf(&knet.NetworkPolicy{}) + namespaceType reflect.Type = reflect.TypeOf(&kapi.Namespace{}) + nodeType reflect.Type = reflect.TypeOf(&kapi.Node{}) + egressFirewallType reflect.Type = reflect.TypeOf(&egressfirewallapi.EgressFirewall{}) ) // NewWatchFactory initializes a new watch factory -func NewWatchFactory(c kubernetes.Interface) (*WatchFactory, error) { +func NewWatchFactory(c kubernetes.Interface, ec egressfirewallclientset.Interface) (*WatchFactory, error) { // resync time is 12 hours, none of the resources being watched in ovn-kubernetes have // any race condition where a resync may be required e.g. cni executable on node watching for // events on pods and assuming that an 'ADD' event will contain the annotations put in by @@ -440,10 +451,17 @@ func NewWatchFactory(c kubernetes.Interface) (*WatchFactory, error) { // the downside of making it tight (like 10 minutes) is needless spinning on all resources wf := &WatchFactory{ iFactory: informerfactory.NewSharedInformerFactory(c, resyncInterval), + efFactory: egressfirewallinformerfactory.NewSharedInformerFactory(ec, resyncInterval), informers: make(map[reflect.Type]*informer), stopChan: make(chan struct{}), } var err error + + err = egressfirewallapi.AddToScheme(egressfirewallscheme.Scheme) + if err != nil { + return nil, err + } + // Create shared informers we know we'll use wf.informers[podType], err = newQueuedInformer(podType, wf.iFactory.Core().V1().Pods().Informer(), wf.stopChan) if err != nil { @@ -465,10 +483,15 @@ func NewWatchFactory(c kubernetes.Interface) (*WatchFactory, error) { if err != nil { return nil, err } + wf.informers[egressFirewallType], err = newInformer(egressFirewallType, wf.efFactory.K8s().V1().EgressFirewalls().Informer()) + if err != nil { + return nil, err + } wf.informers[nodeType], err = newQueuedInformer(nodeType, wf.iFactory.Core().V1().Nodes().Informer(), wf.stopChan) if err != nil { return nil, err } + wf.efFactory.Start(wf.stopChan) wf.iFactory.Start(wf.stopChan) for oType, synced := range wf.iFactory.WaitForCacheSync(wf.stopChan) { @@ -515,6 +538,10 @@ func getObjectMeta(objType reflect.Type, obj interface{}) (*metav1.ObjectMeta, e if node, ok := obj.(*kapi.Node); ok { return &node.ObjectMeta, nil } + case egressFirewallType: + if egressFirewall, ok := obj.(*egressfirewallapi.EgressFirewall); ok { + return &egressFirewall.ObjectMeta, nil + } } return nil, fmt.Errorf("cannot get ObjectMeta from type %v", objType) } @@ -627,6 +654,16 @@ func (wf *WatchFactory) RemovePolicyHandler(handler *Handler) error { return wf.removeHandler(policyType, handler) } +// AddEgressFirewallHandler adds a handler function that will be executed on EgressFirewall object changes +func (wf *WatchFactory) AddEgressFirewallHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (*Handler, error) { + return wf.addHandler(egressFirewallType, "", nil, handlerFuncs, processExisting) +} + +// RemoveEgressFirewallHandler removes an EgressFirewall object event handler function +func (wf *WatchFactory) RemoveEgressFirewallHandler(handler *Handler) error { + return wf.removeHandler(egressFirewallType, handler) +} + // AddNamespaceHandler adds a handler function that will be executed on Namespace object changes func (wf *WatchFactory) AddNamespaceHandler(handlerFuncs cache.ResourceEventHandler, processExisting func([]interface{})) (*Handler, error) { return wf.addHandler(namespaceType, "", nil, handlerFuncs, processExisting) diff --git a/go-controller/pkg/factory/factory_test.go b/go-controller/pkg/factory/factory_test.go index c665686767d..e3bae1e41e2 100644 --- a/go-controller/pkg/factory/factory_test.go +++ b/go-controller/pkg/factory/factory_test.go @@ -17,6 +17,9 @@ import ( core "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" + egressfirewall "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressfirewall/v1" + egressfirewallfake "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressfirewall/v1/apis/clientset/versioned/fake" + . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" ) @@ -98,6 +101,23 @@ func newService(name, namespace string) *v1.Service { } } +func newEgressFirewall(name, namespace string) *egressfirewall.EgressFirewall { + return &egressfirewall.EgressFirewall{ + ObjectMeta: newObjectMeta(name, namespace), + Spec: egressfirewall.EgressFirewallSpec{ + []egressfirewall.EgressFirewallRule{ + { + Type: egressfirewall.EgressFirewallRuleAllow, + To: egressfirewall.EgressFirewallDestination{ + CIDRSelector: "1.2.3.4/32", + }, + }, + }, + }, + } + +} + func objSetup(c *fake.Clientset, objType string, listFn func(core.Action) (bool, runtime.Object, error)) *watch.FakeWatcher { w := watch.NewFake() c.AddWatchReactor(objType, core.DefaultWatchReactor(w, nil)) @@ -105,6 +125,13 @@ func objSetup(c *fake.Clientset, objType string, listFn func(core.Action) (bool, return w } +func egressFirewallObjSetup(c *egressfirewallfake.Clientset, objType string, listFn func(core.Action) (bool, runtime.Object, error)) *watch.FakeWatcher { + w := watch.NewFake() + c.AddWatchReactor(objType, core.DefaultWatchReactor(w, nil)) + c.AddReactor("list", objType, listFn) + return w +} + type handlerCalls struct { added int32 updated int32 @@ -126,8 +153,10 @@ func (c *handlerCalls) getDeleted() int { var _ = Describe("Watch Factory Operations", func() { var ( fakeClient *fake.Clientset + egressFirewallFakeClient *egressfirewallfake.Clientset podWatch, namespaceWatch, nodeWatch *watch.FakeWatcher policyWatch, endpointsWatch, serviceWatch *watch.FakeWatcher + egressFirewallWatch *watch.FakeWatcher pods []*v1.Pod namespaces []*v1.Namespace nodes []*v1.Node @@ -135,11 +164,13 @@ var _ = Describe("Watch Factory Operations", func() { endpoints []*v1.Endpoints services []*v1.Service wf *WatchFactory + egressFirewalls []*egressfirewall.EgressFirewall err error ) BeforeEach(func() { fakeClient = &fake.Clientset{} + egressFirewallFakeClient = &egressfirewallfake.Clientset{} pods = make([]*v1.Pod, 0) podWatch = objSetup(fakeClient, "pods", func(core.Action) (bool, runtime.Object, error) { @@ -194,6 +225,15 @@ var _ = Describe("Watch Factory Operations", func() { } return true, obj, nil }) + + egressFirewalls = make([]*egressfirewall.EgressFirewall, 0) + egressFirewallWatch = egressFirewallObjSetup(egressFirewallFakeClient, "egressfirewalls", func(core.Action) (bool, runtime.Object, error) { + obj := &egressfirewall.EgressFirewallList{} + for _, p := range egressFirewalls { + obj.Items = append(obj.Items, *p) + } + return true, obj, nil + }) }) AfterEach(func() { @@ -202,7 +242,7 @@ var _ = Describe("Watch Factory Operations", func() { Context("when a processExisting is given", func() { testExisting := func(objType reflect.Type, namespace string, lsel *metav1.LabelSelector) { - wf, err = NewWatchFactory(fakeClient) + wf, err = NewWatchFactory(fakeClient, egressFirewallFakeClient) Expect(err).NotTo(HaveOccurred()) h, err := wf.addHandler(objType, namespace, lsel, cache.ResourceEventHandlerFuncs{}, @@ -245,6 +285,11 @@ var _ = Describe("Watch Factory Operations", func() { testExisting(serviceType, "", nil) }) + It("is called for each existing egressFirewall", func() { + egressFirewalls = append(egressFirewalls, newEgressFirewall("myEgressFirewall", "default")) + testExisting(egressFirewallType, "", nil) + }) + It("is called for each existing pod that matches a given namespace and label", func() { pod := newPod("pod1", "default") pod.ObjectMeta.Labels["blah"] = "foobar" @@ -257,7 +302,7 @@ var _ = Describe("Watch Factory Operations", func() { Context("when existing items are known to the informer", func() { testExisting := func(objType reflect.Type) { - wf, err = NewWatchFactory(fakeClient) + wf, err = NewWatchFactory(fakeClient, egressFirewallFakeClient) Expect(err).NotTo(HaveOccurred()) var addCalls int32 h, err := wf.addHandler(objType, "", nil, @@ -308,6 +353,11 @@ var _ = Describe("Watch Factory Operations", func() { services = append(services, newService("myservice2", "default")) testExisting(serviceType) }) + It("calls ADD for each existing egressFirewall", func() { + egressFirewalls = append(egressFirewalls, newEgressFirewall("myFirewall", "default")) + egressFirewalls = append(egressFirewalls, newEgressFirewall("myFirewall1", "default")) + testExisting(egressFirewallType) + }) }) addFilteredHandler := func(wf *WatchFactory, objType reflect.Type, namespace string, lsel *metav1.LabelSelector, funcs cache.ResourceEventHandlerFuncs) (*Handler, *handlerCalls) { @@ -339,7 +389,7 @@ var _ = Describe("Watch Factory Operations", func() { } It("responds to pod add/update/delete events", func() { - wf, err = NewWatchFactory(fakeClient) + wf, err = NewWatchFactory(fakeClient, egressFirewallFakeClient) Expect(err).NotTo(HaveOccurred()) added := newPod("pod1", "default") @@ -373,7 +423,7 @@ var _ = Describe("Watch Factory Operations", func() { }) It("responds to multiple pod add/update/delete events", func() { - wf, err = NewWatchFactory(fakeClient) + wf, err = NewWatchFactory(fakeClient, egressFirewallFakeClient) Expect(err).NotTo(HaveOccurred()) const nodeName string = "mynode" @@ -443,7 +493,7 @@ var _ = Describe("Watch Factory Operations", func() { }) It("responds to namespace add/update/delete events", func() { - wf, err = NewWatchFactory(fakeClient) + wf, err = NewWatchFactory(fakeClient, egressFirewallFakeClient) Expect(err).NotTo(HaveOccurred()) added := newNamespace("default") @@ -477,7 +527,7 @@ var _ = Describe("Watch Factory Operations", func() { }) It("responds to node add/update/delete events", func() { - wf, err = NewWatchFactory(fakeClient) + wf, err = NewWatchFactory(fakeClient, egressFirewallFakeClient) Expect(err).NotTo(HaveOccurred()) added := newNode("mynode") @@ -511,7 +561,7 @@ var _ = Describe("Watch Factory Operations", func() { }) It("responds to multiple node add/update/delete events", func() { - wf, err = NewWatchFactory(fakeClient) + wf, err = NewWatchFactory(fakeClient, egressFirewallFakeClient) Expect(err).NotTo(HaveOccurred()) type opTest struct { @@ -595,7 +645,7 @@ var _ = Describe("Watch Factory Operations", func() { nodes = append(nodes, node) } - wf, err = NewWatchFactory(fakeClient) + wf, err = NewWatchFactory(fakeClient, egressFirewallFakeClient) Expect(err).NotTo(HaveOccurred()) startWg := sync.WaitGroup{} @@ -668,7 +718,7 @@ var _ = Describe("Watch Factory Operations", func() { namespaces = append(namespaces, namespace) } - wf, err = NewWatchFactory(fakeClient) + wf, err = NewWatchFactory(fakeClient, egressFirewallFakeClient) Expect(err).NotTo(HaveOccurred()) startWg := sync.WaitGroup{} @@ -726,7 +776,7 @@ var _ = Describe("Watch Factory Operations", func() { }) It("responds to policy add/update/delete events", func() { - wf, err = NewWatchFactory(fakeClient) + wf, err = NewWatchFactory(fakeClient, egressFirewallFakeClient) Expect(err).NotTo(HaveOccurred()) added := newPolicy("mypolicy", "default") @@ -760,7 +810,7 @@ var _ = Describe("Watch Factory Operations", func() { }) It("responds to endpoints add/update/delete events", func() { - wf, err = NewWatchFactory(fakeClient) + wf, err = NewWatchFactory(fakeClient, egressFirewallFakeClient) Expect(err).NotTo(HaveOccurred()) added := newEndpoints("myendpoints", "default") @@ -801,7 +851,7 @@ var _ = Describe("Watch Factory Operations", func() { }) It("responds to service add/update/delete events", func() { - wf, err = NewWatchFactory(fakeClient) + wf, err = NewWatchFactory(fakeClient, egressFirewallFakeClient) Expect(err).NotTo(HaveOccurred()) added := newService("myservice", "default") @@ -834,8 +884,42 @@ var _ = Describe("Watch Factory Operations", func() { wf.RemoveServiceHandler(h) }) + It("responds to egressFirewall add/update/delete events", func() { + wf, err = NewWatchFactory(fakeClient, egressFirewallFakeClient) + Expect(err).NotTo(HaveOccurred()) + + added := newEgressFirewall("myEgressFirewall", "default") + h, c := addHandler(wf, egressFirewallType, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + egressFirewall := obj.(*egressfirewall.EgressFirewall) + Expect(reflect.DeepEqual(egressFirewall, added)).To(BeTrue()) + }, + UpdateFunc: func(old, new interface{}) { + newEgressFirewall := new.(*egressfirewall.EgressFirewall) + Expect(reflect.DeepEqual(newEgressFirewall, added)).To(BeTrue()) + Expect(newEgressFirewall.Spec.Egress[0].Type).To(Equal(egressfirewall.EgressFirewallRuleDeny)) + }, + DeleteFunc: func(obj interface{}) { + egressFirewall := obj.(*egressfirewall.EgressFirewall) + Expect(reflect.DeepEqual(egressFirewall, added)).To(BeTrue()) + }, + }) + + egressFirewalls = append(egressFirewalls, added) + egressFirewallWatch.Add(added) + Eventually(c.getAdded, 2).Should(Equal(1)) + added.Spec.Egress[0].Type = egressfirewall.EgressFirewallRuleDeny + egressFirewallWatch.Modify(added) + Eventually(c.getUpdated, 2).Should(Equal(1)) + egressFirewalls = egressFirewalls[:0] + egressFirewallWatch.Delete(added) + Eventually(c.getDeleted, 2).Should(Equal(1)) + + wf.RemoveEgressFirewallHandler(h) + }) + It("stops processing events after the handler is removed", func() { - wf, err = NewWatchFactory(fakeClient) + wf, err = NewWatchFactory(fakeClient, egressFirewallFakeClient) Expect(err).NotTo(HaveOccurred()) added := newNamespace("default") @@ -864,7 +948,7 @@ var _ = Describe("Watch Factory Operations", func() { }) It("filters correctly by label and namespace", func() { - wf, err = NewWatchFactory(fakeClient) + wf, err = NewWatchFactory(fakeClient, egressFirewallFakeClient) Expect(err).NotTo(HaveOccurred()) passesFilter := newPod("pod1", "default") @@ -928,7 +1012,7 @@ var _ = Describe("Watch Factory Operations", func() { }) It("correctly handles object updates that cause filter changes", func() { - wf, err = NewWatchFactory(fakeClient) + wf, err = NewWatchFactory(fakeClient, egressFirewallFakeClient) Expect(err).NotTo(HaveOccurred()) pod := newPod("pod1", "default") diff --git a/go-controller/pkg/ovn/egressfirewall.go b/go-controller/pkg/ovn/egressfirewall.go new file mode 100644 index 00000000000..276dbd2b706 --- /dev/null +++ b/go-controller/pkg/ovn/egressfirewall.go @@ -0,0 +1,285 @@ +package ovn + +import ( + "fmt" + "net" + "reflect" + "strings" + "sync" + + "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util" + + egressfirewallapi "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressfirewall/v1" + + "k8s.io/klog" +) + +const ( + defaultStartPriority = 2000 +) + +type egressFirewall struct { + sync.Mutex //not sure if needed + name string + namespace string + egressRules []*egressFirewallRule +} + +type egressFirewallRule struct { + id int + access egressfirewallapi.EgressFirewallRuleType + ports []*port + to destination +} + +type port struct { + protocol *string + portNum *int32 +} + +type destination struct { + cidrSelector string +} + +func newEgressFirewall(egressFirewallPolicy *egressfirewallapi.EgressFirewall) *egressFirewall { + ef := &egressFirewall{ + name: egressFirewallPolicy.Name, + namespace: egressFirewallPolicy.Namespace, + egressRules: make([]*egressFirewallRule, 0), + } + return ef +} + +func newEgressFirewallRule(rawEgressFirewallRule egressfirewallapi.EgressFirewallRule, id int) (*egressFirewallRule, error) { + efr := &egressFirewallRule{ + id: id, + access: rawEgressFirewallRule.Type, + ports: make([]*port, 0), + } + + if rawEgressFirewallRule.To.CIDRSelector == "" { + return nil, fmt.Errorf("EgressFirewallRule must have a CIDRselector set") + } + + _, _, err := net.ParseCIDR(rawEgressFirewallRule.To.CIDRSelector) + if err != nil { + return nil, err + + } + efr.to.cidrSelector = rawEgressFirewallRule.To.CIDRSelector + + for _, rawPort := range rawEgressFirewallRule.Ports { + port := port{} + if rawPort.Protocol != nil { + port.protocol = rawPort.Protocol + } + if rawPort.PortNum != nil { + port.portNum = rawPort.PortNum + } + efr.ports = append(efr.ports, &port) + } + + return efr, nil +} + +func (oc *Controller) addEgressFirewall(egressFirewall *egressfirewallapi.EgressFirewall) { + klog.Infof("Adding egress Firewall %s in namesapce %s", egressFirewall.Name, egressFirewall.Namespace) + nsInfo, err := oc.waitForNamespaceLocked(egressFirewall.Namespace) + if err != nil { + klog.Errorf("failed to wait for namespace %s event (%v)", + egressFirewall.Namespace, err) + return + } + defer nsInfo.Unlock() + + if nsInfo.egressFirewall { + klog.Errorf("Attempting to add egressFirewall %s to namespace %s when it already has an egressFirewall", + egressFirewall.Name, egressFirewall.Namespace) + return + } + + ef := newEgressFirewall(egressFirewall) + nsInfo.egressFirewallPolicy = ef + //lock the newgressFirewall and unlock nsInfo + for i, egressFirewallRule := range egressFirewall.Spec.Egress { + //process Rules into egressFirewallRules for egressFirewall struct + efr, err := newEgressFirewallRule(egressFirewallRule, i) + if err != nil { + klog.Errorf("Cannot create EgressFirewall Rule for destination %s to namespace %s - %v", + egressFirewallRule.To.CIDRSelector, egressFirewall.Namespace, err) + continue + + } + ef.egressRules = append(ef.egressRules, efr) + } + + existingNodes, err := oc.kube.GetNodes() + if err != nil { + klog.Errorf("unable to add egressfirewall %s, cannot list nodes: %s", egressFirewall.Name, err) + } + for _, node := range existingNodes.Items { + joinSwitch := joinSwitch(node.Name) + err = ef.addACLToJoinSwitch(joinSwitch, nsInfo.addressSet.GetHashName()) + if err != nil { + klog.Errorf("%s", err) + } + } + +} + +func (oc *Controller) updateEgressFirewall(oldEgressFirewall, newEgressFirewall *egressfirewallapi.EgressFirewall) { + if !reflect.DeepEqual(oldEgressFirewall.Spec, newEgressFirewall.Spec) { + oc.deleteEgressFirewall(oldEgressFirewall) + oc.addEgressFirewall(newEgressFirewall) + } +} + +func (oc *Controller) deleteEgressFirewall(egressFirewall *egressfirewallapi.EgressFirewall) { + klog.Infof("Deleting egress Firewall %s in namespace %s", egressFirewall.Name, egressFirewall.Namespace) + + nsInfo, err := oc.waitForNamespaceLocked(egressFirewall.Namespace) + if err != nil { + klog.Errorf("failed to wait for namespace %s event (%v)", + egressFirewall.Namespace, err) + } + defer nsInfo.Unlock() + nsInfo.egressFirewall = false + + stdout, stderr, err := util.RunOVNNbctl("--data=bare", "--no-heading", + "--columns=_uuid", "find", "ACL", + fmt.Sprintf("external-ids:egressFirewall=%s", egressFirewall.Namespace)) + if err != nil { + klog.Errorf(" error executing create ACL command, stderr: %q, %+v", stderr, err) + //none of the elements of the egressfirewall have been deleted + nsInfo.egressFirewall = true + return + } + uuids := strings.Fields(stdout) + existingNodes, err := oc.kube.GetNodes() + if err != nil { + klog.Errorf("Error deleting egressFirewall for namespace %s, cannot get nodes to delete ACLS %v", + egressFirewall.Namespace, err) + } + for _, uuid := range uuids { + for _, node := range existingNodes.Items { + _, stderr, err := util.RunOVNNbctl("remove", "logical_switch", + joinSwitch(node.Name), "acls", uuid) + if err != nil { + klog.Errorf("remove failed to delete the rule for "+ + "address_set=%s, stderr: %q (%v)", nsInfo.addressSet.GetHashName(), stderr, err) + } + } + } +} + +func (ef *egressFirewall) addACLToJoinSwitch(joinSwitch, hashedAddressSetName string) error { + for _, rule := range ef.egressRules { + var match string + var action string + if rule.access == egressfirewallapi.EgressFirewallRuleAllow { + action = "allow" + } else { + action = "drop" + } + + if len(rule.ports) == 0 { + match = fmt.Sprintf("match=\"ip4.dst == %s && ip4.src == $%s\"", rule.to.cidrSelector, hashedAddressSetName) + } else { + + match = fmt.Sprintf("match=\"ip4.dst == %s && ip4.src == $%s && ( %s )\"", rule.to.cidrSelector, hashedAddressSetName, egressGetL4Match(rule.ports)) + } + uuid, stderr, err := util.RunOVNNbctl("--data=bare", "--no-heading", + "--columns=_uuid", "find", "ACL", match, "action="+action, + fmt.Sprintf("external-ids:egressFirewall=%s", ef.namespace)) + + if err != nil { + return fmt.Errorf("executing find ACL command, stderr: %q, %+v", stderr, err) + } + if uuid == "" { + _, stderr, err := util.RunOVNNbctl("--id=@acl", "create", "acl", + fmt.Sprintf("priority=%d", defaultStartPriority-rule.id), + fmt.Sprintf("direction=%s", fromLport), match, "action="+action, + fmt.Sprintf("external-ids:egressFirewall=%s", ef.namespace), + "--", "add", "logical_switch", joinSwitch, + "acls", "@acl") + if err != nil { + return fmt.Errorf("executing create ACL command, stderr: %q, %+v", stderr, err) + } + } else { + _, stderr, err := util.RunOVNNbctl("add", "logical_switch", joinSwitch, "acls", uuid) + if err != nil { + return fmt.Errorf("executing create ACL command, stderr: %q, %+v", stderr, err) + + } + } + + } + return nil +} + +func egressGetL4Match(ports []*port) string { + var udpString string + var tcpString string + var sctpString string + for _, port := range ports { + if port.protocol != nil && *port.protocol == "udp" && udpString != "udp" { + if port.portNum == nil { + udpString = "udp" + } else { + udpString = fmt.Sprintf("%s udp.dst == %d ||", udpString, *port.portNum) + } + } else if port.protocol != nil && *port.protocol == "tcp" && tcpString != "tcp" { + if port.portNum == nil { + tcpString = "tcp" + } else { + tcpString = fmt.Sprintf("%s tcp.dst == %d ||", tcpString, *port.portNum) + } + } else if port.protocol != nil && *port.protocol == "sctp" && sctpString != "sctp" { + if port.portNum == nil { + sctpString = "sctp" + } else { + sctpString = fmt.Sprintf("%s sctp.dst == %d ||", sctpString, *port.portNum) + } + } + } + // build the l4 match + var l4Match string + type tuple struct { + protocolName string + protocolFormated string + } + list := []tuple{ + { + protocolName: "udp", + protocolFormated: udpString, + }, + { + protocolName: "tcp", + protocolFormated: tcpString, + }, + { + protocolName: "sctp", + protocolFormated: sctpString, + }, + } + for _, entry := range list { + if entry.protocolName == entry.protocolFormated { + if l4Match == "" { + l4Match = fmt.Sprintf("(%s)", entry.protocolName) + } else { + l4Match = fmt.Sprintf("%s || (%s)", l4Match, entry.protocolName) + } + } else { + if l4Match == "" && entry.protocolFormated != "" { + l4Match = fmt.Sprintf("(%s && (%s))", entry.protocolName, entry.protocolFormated[:len(entry.protocolFormated)-2]) + } else if entry.protocolFormated != "" { + l4Match = fmt.Sprintf("%s || (%s && (%s))", l4Match, entry.protocolName, entry.protocolFormated[:len(entry.protocolFormated)-2]) + } + } + } + return l4Match +} + +func joinSwitch(nodeName string) string { + return fmt.Sprintf("join_%s", nodeName) +} diff --git a/go-controller/pkg/ovn/egressfirewall_test.go b/go-controller/pkg/ovn/egressfirewall_test.go new file mode 100644 index 00000000000..5271edcdfb8 --- /dev/null +++ b/go-controller/pkg/ovn/egressfirewall_test.go @@ -0,0 +1,504 @@ +package ovn + +import ( + "fmt" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + // "k8s.io/client-go/kubernetes/fake" + + "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/config" + egressfirewallapi "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressfirewall/v1" + //egressfirewallfake "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressfirewall/v1/apis/clientset/versioned/fake" + //"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/factory" + ovntest "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/testing" + "github.com/urfave/cli/v2" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" +) + +//func newEgressFirewallMeta(name, namespace string) metav1.ObjectMeta { +func newObjectMeta(name, namespace string) metav1.ObjectMeta { + return metav1.ObjectMeta{ + UID: types.UID(namespace), + Name: name, + Namespace: namespace, + } + +} + +func newEgressFirewallObject(name, namespace string, egressRules []egressfirewallapi.EgressFirewallRule) *egressfirewallapi.EgressFirewall { + + return &egressfirewallapi.EgressFirewall{ + ObjectMeta: newObjectMeta(name, namespace), + Spec: egressfirewallapi.EgressFirewallSpec{ + Egress: egressRules, + }, + } +} + +var _ = Describe("OVN EgressFirewall Operations", func() { + var ( + app *cli.App + fakeOVN *FakeOVN + fExec *ovntest.FakeExec + ) + + BeforeEach(func() { + // Restore global default values before each testcase + config.PrepareTestConfig() + + app = cli.NewApp() + app.Name = "test" + app.Flags = config.Flags + + fExec = ovntest.NewLooseCompareFakeExec() + fakeOVN = NewFakeOVN(fExec) + }) + + AfterEach(func() { + fakeOVN.shutdown() + }) + + Context("on startup", func() { + It("reconciles an existing egressFirewall", func() { + app.Action = func(ctx *cli.Context) error { + const ( + node1Name string = "node1" + ) + fExec.AddFakeCmdsNoOutputNoError([]string{ + "ovn-nbctl --timeout=15 --data=bare --no-heading --columns=_uuid find ACL match=\"ip4.dst == 1.2.3.4/23 && ip4.src == $a6953372168492035427\" action=allow external-ids:egressFirewall=namespace1", + "ovn-nbctl --timeout=15 --id=@acl create acl priority=2000 direction=from-lport match=\"ip4.dst == 1.2.3.4/23 && ip4.src == $a6953372168492035427\" action=allow external-ids:egressFirewall=namespace1 -- add logical_switch join_node1 acls @acl", + }) + + namespace1 := *newNamespace("namespace1") + egressFirewall := newEgressFirewallObject("default", namespace1.Name, []egressfirewallapi.EgressFirewallRule{ + { + Type: "Allow", + To: egressfirewallapi.EgressFirewallDestination{ + CIDRSelector: "1.2.3.4/23", + }, + }, + }) + + fakeOVN.start(ctx, &egressfirewallapi.EgressFirewallList{ + Items: []egressfirewallapi.EgressFirewall{ + *egressFirewall, + }, + }, &v1.NamespaceList{ + Items: []v1.Namespace{ + namespace1, + }, + }, &v1.NodeList{ + Items: []v1.Node{ + { + Status: v1.NodeStatus{ + Phase: v1.NodeRunning, + }, + ObjectMeta: newObjectMeta(node1Name, ""), + }, + }, + }) + fakeOVN.controller.WatchNamespaces() + fakeOVN.controller.WatchEgressFirewall() + + _, err := fakeOVN.fakeEgressClient.K8sV1().EgressFirewalls(egressFirewall.Namespace).Get(egressFirewall.Name, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + + Eventually(fExec.CalledMatchesExpected).Should(BeTrue(), fExec.ErrorDesc) + + return nil + } + + err := app.Run([]string{app.Name}) + Expect(err).NotTo(HaveOccurred()) + + }) + }) + Context("during execution", func() { + It("correctly creates an egressfirewall denying traffic all udp traffic", func() { + app.Action = func(ctx *cli.Context) error { + const ( + node1Name string = "node1" + ) + fExec.AddFakeCmdsNoOutputNoError([]string{ + "ovn-nbctl --timeout=15 --data=bare --no-heading --columns=_uuid find ACL match=\"ip4.dst == 1.2.3.4/23 && ip4.src == $a6953372168492035427 && ( (udp) )\" action=drop external-ids:egressFirewall=namespace1", + "ovn-nbctl --timeout=15 --id=@acl create acl priority=2000 direction=from-lport match=\"ip4.dst == 1.2.3.4/23 && ip4.src == $a6953372168492035427 && ( (udp) )\" action=drop external-ids:egressFirewall=namespace1 -- add logical_switch join_node1 acls @acl", + }) + namespace1 := *newNamespace("namespace1") + egressFirewall := newEgressFirewallObject("default", namespace1.Name, []egressfirewallapi.EgressFirewallRule{ + { + Type: "Deny", + Ports: []egressfirewallapi.EgressFirewallPort{ + { + Protocol: pointerToString("udp"), + }, + }, + To: egressfirewallapi.EgressFirewallDestination{ + CIDRSelector: "1.2.3.4/23", + }, + }, + }) + fakeOVN.start(ctx, nil, + &v1.NamespaceList{ + Items: []v1.Namespace{ + namespace1, + }, + }, &v1.NodeList{ + Items: []v1.Node{ + { + Status: v1.NodeStatus{ + Phase: v1.NodeRunning, + }, + ObjectMeta: newObjectMeta(node1Name, ""), + }, + }, + }) + fakeOVN.controller.WatchNamespaces() + _, err := fakeOVN.fakeEgressClient.K8sV1().EgressFirewalls(egressFirewall.Namespace).Create(egressFirewall) + Expect(err).NotTo(HaveOccurred()) + + fakeOVN.controller.WatchEgressFirewall() + + Eventually(fExec.CalledMatchesExpected).Should(BeTrue(), fExec.ErrorDesc) + + return nil + } + err := app.Run([]string{app.Name}) + Expect(err).NotTo(HaveOccurred()) + }) + It("correctly deletes an egressfirewall", func() { + app.Action = func(ctx *cli.Context) error { + const ( + node1Name string = "node1" + ) + fExec.AddFakeCmdsNoOutputNoError([]string{ + "ovn-nbctl --timeout=15 --data=bare --no-heading --columns=_uuid find ACL match=\"ip4.dst == 1.2.3.5/23 && " + + "ip4.src == $a6953372168492035427 && ( (udp && ( udp.dst == 100 )) || (tcp) )\" action=allow external-ids:egressFirewall=namespace1", + "ovn-nbctl --timeout=15 --id=@acl create acl priority=2000 direction=from-lport match=\"ip4.dst == 1.2.3.5/23 && " + + "ip4.src == $a6953372168492035427 && ( (udp && ( udp.dst == 100 )) || (tcp) )\" action=allow external-ids:egressFirewall=namespace1 -- add logical_switch join_node1 acls @acl", + fmt.Sprintf("ovn-nbctl --timeout=15 remove logical_switch join_node1 acls %s", fakeUUID), + }) + fExec.AddFakeCmd(&ovntest.ExpectedCmd{ + Cmd: "ovn-nbctl --timeout=15 --data=bare --no-heading --columns=_uuid find ACL external-ids:egressFirewall=namespace1", + Output: fakeUUID, + }) + + namespace1 := *newNamespace("namespace1") + egressFirewall := newEgressFirewallObject("default", namespace1.Name, []egressfirewallapi.EgressFirewallRule{ + { + Type: "Allow", + Ports: []egressfirewallapi.EgressFirewallPort{ + { + Protocol: pointerToString("tcp"), + }, + { + Protocol: pointerToString("udp"), + PortNum: pointerInt32(100), + }, + }, + To: egressfirewallapi.EgressFirewallDestination{ + CIDRSelector: "1.2.3.5/23", + }, + }, + }) + + fakeOVN.start(ctx, &egressfirewallapi.EgressFirewallList{ + Items: []egressfirewallapi.EgressFirewall{ + *egressFirewall, + }, + }, &v1.NamespaceList{ + Items: []v1.Namespace{ + namespace1, + }, + }, &v1.NodeList{ + Items: []v1.Node{ + { + Status: v1.NodeStatus{ + Phase: v1.NodeRunning, + }, + ObjectMeta: newObjectMeta(node1Name, ""), + }, + }, + }) + fakeOVN.controller.WatchNamespaces() + fakeOVN.controller.WatchEgressFirewall() + + err := fakeOVN.fakeEgressClient.K8sV1().EgressFirewalls(egressFirewall.Namespace).Delete(egressFirewall.Name, metav1.NewDeleteOptions(0)) + Expect(err).NotTo(HaveOccurred()) + + Eventually(fExec.CalledMatchesExpected).Should(BeTrue(), fExec.ErrorDesc) + + return nil + } + + err := app.Run([]string{app.Name}) + Expect(err).NotTo(HaveOccurred()) + }) + It("correctly updates an egressfirewall", func() { + app.Action = func(ctx *cli.Context) error { + const ( + node1Name string = "node1" + ) + fExec.AddFakeCmdsNoOutputNoError([]string{ + "ovn-nbctl --timeout=15 --data=bare --no-heading --columns=_uuid find ACL match=\"ip4.dst == 1.2.3.4/23 && ip4.src == $a6953372168492035427\" action=allow external-ids:egressFirewall=namespace1", + "ovn-nbctl --timeout=15 --id=@acl create acl priority=2000 direction=from-lport match=\"ip4.dst == 1.2.3.4/23 && ip4.src == $a6953372168492035427\" action=allow external-ids:egressFirewall=namespace1 -- add logical_switch join_node1 acls @acl", + fmt.Sprintf("ovn-nbctl --timeout=15 remove logical_switch join_node1 acls %s", fakeUUID), + "ovn-nbctl --timeout=15 --data=bare --no-heading --columns=_uuid find ACL match=\"ip4.dst == 1.2.3.4/23 && ip4.src == $a6953372168492035427\" action=drop external-ids:egressFirewall=namespace1", + "ovn-nbctl --timeout=15 --id=@acl create acl priority=2000 direction=from-lport match=\"ip4.dst == 1.2.3.4/23 && ip4.src == $a6953372168492035427\" action=drop external-ids:egressFirewall=namespace1 -- add logical_switch join_node1 acls @acl", + }) + fExec.AddFakeCmd(&ovntest.ExpectedCmd{ + Cmd: "ovn-nbctl --timeout=15 --data=bare --no-heading --columns=_uuid find ACL external-ids:egressFirewall=namespace1", + Output: fakeUUID, + }) + + namespace1 := *newNamespace("namespace1") + egressFirewall := newEgressFirewallObject("default", namespace1.Name, []egressfirewallapi.EgressFirewallRule{ + { + Type: "Allow", + To: egressfirewallapi.EgressFirewallDestination{ + CIDRSelector: "1.2.3.4/23", + }, + }, + }) + egressFirewall1 := newEgressFirewallObject("default", namespace1.Name, []egressfirewallapi.EgressFirewallRule{ + { + Type: "Deny", + To: egressfirewallapi.EgressFirewallDestination{ + CIDRSelector: "1.2.3.4/23", + }, + }, + }) + + fakeOVN.start(ctx, &egressfirewallapi.EgressFirewallList{ + Items: []egressfirewallapi.EgressFirewall{ + *egressFirewall, + }, + }, &v1.NamespaceList{ + Items: []v1.Namespace{ + namespace1, + }, + }, &v1.NodeList{ + Items: []v1.Node{ + { + Status: v1.NodeStatus{ + Phase: v1.NodeRunning, + }, + ObjectMeta: newObjectMeta(node1Name, ""), + }, + }, + }) + fakeOVN.controller.WatchNamespaces() + fakeOVN.controller.WatchEgressFirewall() + + _, err := fakeOVN.fakeEgressClient.K8sV1().EgressFirewalls(egressFirewall.Namespace).Get(egressFirewall.Name, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + _, err = fakeOVN.fakeEgressClient.K8sV1().EgressFirewalls(egressFirewall1.Namespace).Update(egressFirewall1) + Expect(err).NotTo(HaveOccurred()) + + Eventually(fExec.CalledMatchesExpected).Should(BeTrue(), fExec.ErrorDesc) + + return nil + } + + err := app.Run([]string{app.Name}) + Expect(err).NotTo(HaveOccurred()) + + }) + It("correctly adds an existing egressFirewall to a new node", func() { + app.Action = func(ctx *cli.Context) error { + const ( + node1Name string = "node1" + ) + stopChan := make(chan struct{}) + defer close(stopChan) + fExec.AddFakeCmdsNoOutputNoError([]string{ + //adding the original node commands + "ovn-sbctl --timeout=15 --data=bare --no-heading --columns=name,hostname --format=json list Chassis", + "ovn-nbctl --timeout=15 --data=bare --no-heading --columns=name,other-config find logical_switch other-config:subnet!=_", + "ovn-nbctl --timeout=15 --if-exists lrp-del rtos-node1 -- lrp-add ovn_cluster_router rtos-node1 ", + "ovn-nbctl --timeout=15 --may-exist ls-add node1 -- set logical_switch node1", + "ovn-nbctl --timeout=15 set logical_switch node1 other-config:mcast_snoop=\"true\"", + "ovn-nbctl --timeout=15 set logical_switch node1 other-config:mcast_querier=\"false\"", + "ovn-nbctl --timeout=15 -- --may-exist lsp-add node1 stor-node1 -- set logical_switch_port stor-node1 type=router options:router-port=rtos-node1 addresses=\"\"", + "ovn-nbctl --timeout=15 set logical_switch node1 load_balancer=fakeTCPLoadBalancerUUID", + "ovn-nbctl --timeout=15 add logical_switch node1 load_balancer fakeUDPLoadBalancerUUID", + "ovn-nbctl --timeout=15 -- --if-exists lsp-del k8s-node1", + //adding the new node + "ovn-nbctl --timeout=15 --if-exists lrp-del rtos-newNode -- lrp-add ovn_cluster_router rtos-newNode ", + "ovn-nbctl --timeout=15 --may-exist ls-add newNode -- set logical_switch newNode", + "ovn-nbctl --timeout=15 set logical_switch newNode other-config:mcast_snoop=\"true\"", + "ovn-nbctl --timeout=15 set logical_switch newNode other-config:mcast_querier=\"false\"", + "ovn-nbctl --timeout=15 -- --may-exist lsp-add newNode stor-newNode -- set logical_switch_port stor-newNode type=router options:router-port=rtos-newNode addresses=\"\"", + "ovn-nbctl --timeout=15 set logical_switch newNode load_balancer=fakeTCPLoadBalancerUUID", + "ovn-nbctl --timeout=15 add logical_switch newNode load_balancer fakeUDPLoadBalancerUUID", + "ovn-nbctl --timeout=15 -- --if-exists lsp-del k8s-newNode", + }) + + fExec.AddFakeCmdsNoOutputNoError([]string{ + //adding the original egressFirewall + "ovn-nbctl --timeout=15 --data=bare --no-heading --columns=_uuid find ACL match=\"ip4.dst == 1.2.3.4/23 && ip4.src == $a6953372168492035427\" action=allow external-ids:egressFirewall=namespace1", + "ovn-nbctl --timeout=15 --id=@acl create acl priority=2000 direction=from-lport match=\"ip4.dst == 1.2.3.4/23 && ip4.src == $a6953372168492035427\" action=allow external-ids:egressFirewall=namespace1 -- add logical_switch join_node1 acls @acl", + "ovn-nbctl --timeout=15 add logical_switch join_newNode acls " + fakeUUID, + }) + fExec.AddFakeCmd(&ovntest.ExpectedCmd{ + //query ovn and get the UUID of the original ACL + Cmd: "ovn-nbctl --timeout=15 --data=bare --no-heading --columns=_uuid find ACL match=\"ip4.dst == 1.2.3.4/23 && ip4.src == $a6953372168492035427\" action=allow external-ids:egressFirewall=namespace1", + Output: fakeUUID, + }) + + namespace1 := *newNamespace("namespace1") + egressFirewall := newEgressFirewallObject("default", namespace1.Name, []egressfirewallapi.EgressFirewallRule{ + { + Type: "Allow", + To: egressfirewallapi.EgressFirewallDestination{ + CIDRSelector: "1.2.3.4/23", + }, + }, + }) + newNode := &v1.Node{ + ObjectMeta: newObjectMeta("newNode", ""), + Status: v1.NodeStatus{ + Addresses: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: "10.0.0.0"}, + }, + }, + } + + fakeOVN.start(ctx, &egressfirewallapi.EgressFirewallList{ + Items: []egressfirewallapi.EgressFirewall{ + *egressFirewall, + }, + }, &v1.NamespaceList{ + Items: []v1.Namespace{ + namespace1, + }, + }, &v1.NodeList{ + Items: []v1.Node{ + { + Status: v1.NodeStatus{ + Phase: v1.NodeRunning, + }, + ObjectMeta: newObjectMeta(node1Name, ""), + }, + }, + }) + fakeOVN.controller.TCPLoadBalancerUUID = "fakeTCPLoadBalancerUUID" + fakeOVN.controller.UDPLoadBalancerUUID = "fakeUDPLoadBalancerUUID" + fakeOVN.controller.SCTPLoadBalancerUUID = "fakeSTCPLoadBalancerUUID" + fakeOVN.controller.WatchNodes() + fakeOVN.controller.WatchNamespaces() + fakeOVN.controller.WatchEgressFirewall() + + _, err := fakeOVN.fakeEgressClient.K8sV1().EgressFirewalls(egressFirewall.Namespace).Get(egressFirewall.Name, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + + _, err = fakeOVN.fakeClient.CoreV1().Nodes().Create(newNode) + Expect(err).NotTo(HaveOccurred()) + + Eventually(fExec.CalledMatchesExpected).Should(BeTrue(), fExec.ErrorDesc) + + return nil + } + + err := app.Run([]string{app.Name}) + Expect(err).NotTo(HaveOccurred()) + }) + + }) + +}) + +func pointerInt32(x int32) *int32 { + return &x +} + +func pointerToString(x string) *string { + return &x +} + +var _ = Describe("OVN test basic functions", func() { + + It("computes correct L4Match", func() { + type testcase struct { + ports []*port + expectedMatch string + } + testcases := []testcase{ + { + expectedMatch: "", + }, + { + ports: []*port{ + { + protocol: pointerToString("tcp"), + }, + }, + expectedMatch: "(tcp)", + }, + { + ports: []*port{ + { + protocol: pointerToString("udp"), + }, + }, + expectedMatch: "(udp)", + }, + { + ports: []*port{ + { + protocol: pointerToString("sctp"), + }, + }, + expectedMatch: "(sctp)", + }, + { + ports: []*port{ + { + protocol: pointerToString("tcp"), + portNum: pointerInt32(100), + }, + }, + expectedMatch: "(tcp && ( tcp.dst == 100 ))", + }, + { + ports: []*port{ + { + protocol: pointerToString("tcp"), + portNum: pointerInt32(100), + }, + { + protocol: pointerToString("udp"), + }, + }, + expectedMatch: "(udp) || (tcp && ( tcp.dst == 100 ))", + }, + { + ports: []*port{ + { + protocol: pointerToString("tcp"), + portNum: pointerInt32(100), + }, + { + protocol: pointerToString("sctp"), + portNum: pointerInt32(13), + }, + { + protocol: pointerToString("tcp"), + portNum: pointerInt32(102), + }, + { + protocol: pointerToString("udp"), + portNum: pointerInt32(400), + }, + }, + expectedMatch: "(udp && ( udp.dst == 400 )) || (tcp && ( tcp.dst == 100 || tcp.dst == 102 )) || (sctp && ( sctp.dst == 13 ))", + }, + } + for _, test := range testcases { + l4Match := egressGetL4Match(test.ports) + Expect(test.expectedMatch).To(Equal(l4Match)) + } + }) +}) diff --git a/go-controller/pkg/ovn/ovn.go b/go-controller/pkg/ovn/ovn.go index 7a4c6ca1674..71b5f711df0 100644 --- a/go-controller/pkg/ovn/ovn.go +++ b/go-controller/pkg/ovn/ovn.go @@ -17,6 +17,8 @@ import ( "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/ovn/subnetallocator" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util" + egressfirewall "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressfirewall/v1" + kapi "k8s.io/api/core/v1" kapisnetworking "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -62,12 +64,18 @@ type namespaceInfo struct { // the policy itself. networkPolicies map[string]*namespacePolicy + //defines the namespace egressFirewallRules + egressFirewallPolicy *egressFirewall + hybridOverlayExternalGW net.IP hybridOverlayVTEP net.IP // The UUID of the namespace-wide port group that contains all the pods in the namespace. portGroupUUID string + // Returns true if there is an egressFirewall Associated with this namespace + egressFirewall bool + multicastEnabled bool } @@ -203,7 +211,7 @@ func (oc *Controller) Run() error { } for _, f := range []func() error{oc.WatchNamespaces, oc.WatchPods, oc.WatchServices, - oc.WatchEndpoints, oc.WatchNetworkPolicy} { + oc.WatchEndpoints, oc.WatchNetworkPolicy, oc.WatchEgressFirewall} { if err := f(); err != nil { return err } @@ -550,6 +558,30 @@ func (oc *Controller) WatchNetworkPolicy() error { return err } +// WatchEgressFirewall starts the watching of egressfirewall resource and calls +// back the appropriate handler logic +func (oc *Controller) WatchEgressFirewall() error { + _, err := oc.watchFactory.AddEgressFirewallHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + egressFirewall := obj.(*egressfirewall.EgressFirewall) + klog.Errorf("KEYWORD: ADDING NEW EGRESSFIREWALL") + oc.addEgressFirewall(egressFirewall) + }, + UpdateFunc: func(old, newer interface{}) { + klog.Errorf("KEYWORD: UPDATING EXISTING EGRESSFIREWALL") + newEgressFirewall := newer.(*egressfirewall.EgressFirewall) + oldEgressFirewall := old.(*egressfirewall.EgressFirewall) + oc.updateEgressFirewall(oldEgressFirewall, newEgressFirewall) + }, + DeleteFunc: func(obj interface{}) { + klog.Errorf("KEYWORD: DELEING EXISTING EGRESSFIREWALL") + egressFirewall := obj.(*egressfirewall.EgressFirewall) + oc.deleteEgressFirewall(egressFirewall) + }, + }, nil) + return err +} + // WatchNamespaces starts the watching of namespace resource and calls // back the appropriate handler logic func (oc *Controller) WatchNamespaces() error { @@ -635,6 +667,29 @@ func (oc *Controller) WatchNodes() error { klog.Warningf(err.Error()) gatewaysFailed.Store(node.Name, true) } + + //add any existing egressFirewall objects to join switch + namespaceList, err := oc.watchFactory.GetNamespaces() + if err != nil { + klog.Errorf("error getting list of namespaces when adding node: %s", node.Name) + } + klog.Errorf("KEYWORD: %s - %s\n", node.Name, namespaceList) + for _, namespace := range namespaceList { + nsInfo, err := oc.waitForNamespaceLocked(namespace.Name) + if err != nil { + klog.Errorf("failed to wait for namespace %s event (%v)", + namespace.Name, err) + continue + } + if nsInfo.egressFirewallPolicy != nil { + err = nsInfo.egressFirewallPolicy.addACLToJoinSwitch(joinSwitch(node.Name), nsInfo.addressSet.GetHashName()) + if err != nil { + klog.Errorf("%s", err) + } + } + + nsInfo.Unlock() + } }, UpdateFunc: func(old, new interface{}) { oldNode := old.(*kapi.Node) diff --git a/go-controller/pkg/util/kube.go b/go-controller/pkg/util/kube.go index c556694c5de..dbf3157f2a8 100644 --- a/go-controller/pkg/util/kube.go +++ b/go-controller/pkg/util/kube.go @@ -15,13 +15,15 @@ import ( "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/cert" + egressfirewallclientset "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressfirewall/v1/apis/clientset/versioned" + "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/cni/types" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/config" ) // NewClientset creates a Kubernetes clientset from either a kubeconfig, // TLS properties, or an apiserver URL -func NewClientset(conf *config.KubernetesConfig) (*kubernetes.Clientset, error) { +func NewClientset(conf *config.KubernetesConfig) (*kubernetes.Clientset, *egressfirewallclientset.Clientset, error) { var kconfig *rest.Config var err error @@ -31,7 +33,7 @@ func NewClientset(conf *config.KubernetesConfig) (*kubernetes.Clientset, error) } else if strings.HasPrefix(conf.APIServer, "https") { // TODO: Looks like the check conf.APIServer is redundant and can be removed if conf.APIServer == "" || conf.Token == "" { - return nil, fmt.Errorf("TLS-secured apiservers require token and CA certificate") + return nil, nil, fmt.Errorf("TLS-secured apiservers require token and CA certificate") } kconfig = &rest.Config{ Host: conf.APIServer, @@ -39,7 +41,7 @@ func NewClientset(conf *config.KubernetesConfig) (*kubernetes.Clientset, error) } if conf.CACert != "" { if _, err := cert.NewPool(conf.CACert); err != nil { - return nil, err + return nil, nil, err } kconfig.TLSClientConfig = rest.TLSClientConfig{CAFile: conf.CACert} } @@ -52,13 +54,22 @@ func NewClientset(conf *config.KubernetesConfig) (*kubernetes.Clientset, error) kconfig, err = rest.InClusterConfig() } if err != nil { - return nil, err + return nil, nil, err } kconfig.AcceptContentTypes = "application/vnd.kubernetes.protobuf,application/json" kconfig.ContentType = "application/vnd.kubernetes.protobuf" - return kubernetes.NewForConfig(kconfig) + kClientset, err := kubernetes.NewForConfig(kconfig) + if err != nil { + return nil, nil, err + } + egressFirewallClientset, err := egressfirewallclientset.NewForConfig(kconfig) + if err != nil { + return nil, nil, err + } + + return kClientset, egressFirewallClientset, nil } // IsClusterIPSet checks if the service is an headless service or not