diff --git a/go-controller/pkg/ovn/base_network_controller_pods.go b/go-controller/pkg/ovn/base_network_controller_pods.go index 40092b83b77..ce22d1c6c35 100644 --- a/go-controller/pkg/ovn/base_network_controller_pods.go +++ b/go-controller/pkg/ovn/base_network_controller_pods.go @@ -214,32 +214,29 @@ func (bnc *BaseNetworkController) deletePodLogicalPort(pod *kapi.Pod, portInfo * // check to make sure no other pods are using this IP before we try to release it if this is a completed pod. if util.PodCompleted(pod) { if shouldRelease, err = bnc.lsManager.ConditionalIPRelease(switchName, podIfAddrs, func() (bool, error) { - pods, err := bnc.watchFactory.GetAllPods() + + // Ignore pods on other switches + if expectedSwitchName != switchName { + return true, nil + } + + var needleIPs []net.IP + for _, podIPNet := range podIfAddrs { + needleIPs = append(needleIPs, podIPNet.IP) + } + + collidingPod, err := bnc.findPodWithIPAddresses(needleIPs) if err != nil { - return false, fmt.Errorf("unable to get pods to determine if completed pod IP is in use by another pod. "+ - "Will not release pod %s/%s IP: %#v from allocator", pod.Namespace, pod.Name, podIfAddrs) + return false, fmt.Errorf("unable to determine if completed pod IP is in use by another pod. "+ + "Will not release pod %s/%s IP: %#v from allocator. %v", pod.Namespace, pod.Name, podIfAddrs, err) } - // iterate through all pods, ignore pods on other switches - for _, p := range pods { - if util.PodCompleted(p) || util.PodWantsHostNetwork(p) || !util.PodScheduled(p) || expectedSwitchName != switchName { - continue - } - // check if the pod addresses match in the OVN annotation - pAddrs, err := util.GetPodIPsOfNetwork(p, bnc.NetInfo) - if err != nil { - continue - } - for _, pAddr := range pAddrs { - for _, podAddr := range podIfAddrs { - if pAddr.Equal(podAddr.IP) { - klog.Infof("Will not release IP address: %s for %s. Detected another pod"+ - " using this IP: %s/%s", pAddr.String(), podDesc, p.Namespace, p.Name) - return false, nil - } - } - } + if collidingPod != nil { + klog.Infof("Will not release IP address: %s for %s. Detected another pod"+ + " using this IP: %s/%s", util.JoinIPNetIPs(podIfAddrs, " "), podDesc, collidingPod.Namespace, collidingPod.Name) + return false, nil } + klog.Infof("Releasing IPs for Completed pod: %s/%s, ips: %s", pod.Namespace, pod.Name, util.JoinIPNetIPs(podIfAddrs, " ")) return true, nil @@ -291,6 +288,35 @@ func (bnc *BaseNetworkController) deletePodLogicalPort(pod *kapi.Pod, portInfo * return &pInfo, nil } +func (bnc *BaseNetworkController) findPodWithIPAddresses(needleIPs []net.IP) (*kapi.Pod, error) { + allPods, err := bnc.watchFactory.GetAllPods() + if err != nil { + return nil, fmt.Errorf("unable to get pods: %w", err) + } + + // iterate through all pods + for _, p := range allPods { + if util.PodCompleted(p) || util.PodWantsHostNetwork(p) || !util.PodScheduled(p) { + continue + } + // check if the pod addresses match in the OVN annotation + haystackPodAddrs, err := util.GetPodIPsOfNetwork(p, bnc.NetInfo) + if err != nil { + continue + } + + for _, haystackPodAddr := range haystackPodAddrs { + for _, needleIP := range needleIPs { + if haystackPodAddr.Equal(needleIP) { + return p, nil + } + } + } + } + + return nil, nil +} + func (bnc *BaseNetworkController) releasePodIPs(pInfo *lpInfo) error { if err := bnc.lsManager.ReleaseIPs(pInfo.logicalSwitch, pInfo.ips); err != nil { if !errors.Is(err, logicalswitchmanager.SwitchNotFound) { diff --git a/go-controller/pkg/ovn/default_network_controller.go b/go-controller/pkg/ovn/default_network_controller.go index 007e5fba4df..9e05d7d6d64 100644 --- a/go-controller/pkg/ovn/default_network_controller.go +++ b/go-controller/pkg/ovn/default_network_controller.go @@ -934,7 +934,7 @@ func (h *defaultNetworkControllerEventHandler) DeleteResource(obj, cachedObj int case factory.PeerPodSelectorType: extraParameters := h.extraParameters.(*NetworkPolicyExtraParameters) - return h.oc.handlePeerPodSelectorDelete(extraParameters.np, extraParameters.gp, obj) + return h.oc.handlePeerPodSelectorDelete(extraParameters.np, extraParameters.gp, extraParameters.podSelector, obj) case factory.PeerNamespaceAndPodSelectorType: extraParameters := h.extraParameters.(*NetworkPolicyExtraParameters) @@ -942,7 +942,7 @@ func (h *defaultNetworkControllerEventHandler) DeleteResource(obj, cachedObj int case factory.PeerPodForNamespaceAndPodSelectorType: extraParameters := h.extraParameters.(*NetworkPolicyExtraParameters) - return h.oc.handlePeerPodSelectorDelete(extraParameters.np, extraParameters.gp, obj) + return h.oc.handlePeerPodSelectorDelete(extraParameters.np, extraParameters.gp, extraParameters.podSelector, obj) case factory.PeerNamespaceSelectorType: extraParameters := h.extraParameters.(*NetworkPolicyExtraParameters) diff --git a/go-controller/pkg/ovn/policy.go b/go-controller/pkg/ovn/policy.go index 70b1d671122..863715381ba 100644 --- a/go-controller/pkg/ovn/policy.go +++ b/go-controller/pkg/ovn/policy.go @@ -1393,7 +1393,7 @@ func (oc *DefaultNetworkController) handlePeerPodSelectorAddUpdate(np *networkPo // handlePeerPodSelectorDelete removes the IP address of a pod that no longer // matches a NetworkPolicy ingress/egress section's selectors from that // ingress/egress address set -func (oc *DefaultNetworkController) handlePeerPodSelectorDelete(np *networkPolicy, gp *gressPolicy, obj interface{}) error { +func (oc *DefaultNetworkController) handlePeerPodSelectorDelete(np *networkPolicy, gp *gressPolicy, podSelector labels.Selector, obj interface{}) error { np.RLock() defer np.RUnlock() if np.deleted { @@ -1404,6 +1404,29 @@ func (oc *DefaultNetworkController) handlePeerPodSelectorDelete(np *networkPolic klog.Infof("Pod %s/%s not scheduled on any node, skipping it", pod.Namespace, pod.Name) return nil } + + if util.PodCompleted(pod) { + ips, err := util.GetPodIPsOfNetwork(pod, &util.DefaultNetInfo{}) + if err != nil { + return fmt.Errorf("can't get pod IPs %s/%s: %w", pod.Namespace, pod.Name, err) + } + + collidingPod, err := oc.findPodWithIPAddresses(ips) + if err != nil { + return fmt.Errorf("lookup for pods with the same IPs [%s] failed: %w", util.JoinIPs(ips, " "), err) + } + + if collidingPod != nil { + + // If the IP is used by another Pod that is targeted by the same network policy, don't remove the IP from the Address_Set + if podSelector.Matches(labels.Set(collidingPod.Labels)) { + klog.Infof("Not deleting Pod %s/%s IPs [%s] as they are used by %s/%s", pod.Namespace, pod.Name, + util.JoinIPs(ips, " "), collidingPod.Namespace, collidingPod.Name) + return nil + } + } + } + // gressPolicy.deletePeerPod must be called with networkPolicy RLock. if err := gp.deletePeerPod(pod); err != nil { return err @@ -1439,8 +1462,9 @@ func (oc *DefaultNetworkController) addPeerPodHandler(podSelector *metav1.LabelS factory.PeerPodSelectorType, syncFunc, &NetworkPolicyExtraParameters{ - np: np, - gp: gp, + np: np, + gp: gp, + podSelector: sel, }) podHandler, err := retryPeerPods.WatchResourceFiltered(namespace, sel) @@ -1477,8 +1501,9 @@ func (oc *DefaultNetworkController) handlePeerNamespaceAndPodAdd(np *networkPoli factory.PeerPodForNamespaceAndPodSelectorType, syncFunc, &NetworkPolicyExtraParameters{ - gp: gp, - np: np, + gp: gp, + np: np, + podSelector: podSelector, }, ) // syncFunc and factory.PeerPodForNamespaceAndPodSelectorType add event handler also take np.RLock, diff --git a/go-controller/pkg/ovn/policy_test.go b/go-controller/pkg/ovn/policy_test.go index 0a9859938f9..8b2fc3414a4 100644 --- a/go-controller/pkg/ovn/policy_test.go +++ b/go-controller/pkg/ovn/policy_test.go @@ -27,6 +27,7 @@ import ( "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types" "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util" + kapi "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" knet "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -1535,6 +1536,92 @@ var _ = ginkgo.Describe("OVN NetworkPolicy Operations", func() { gomega.Expect(err).NotTo(gomega.HaveOccurred()) }) + ginkgo.It("reconciles a completed and deleted pod whose IP has been assigned to a running pod", func() { + app.Action = func(ctx *cli.Context) error { + namespace1 := *newNamespace(namespaceName1) + nodeName := "node1" + + // Use simple allow-same-namespace network policy + networkPolicy := newNetworkPolicy("networkpolicy1", namespace1.Name, + metav1.LabelSelector{}, + []knet.NetworkPolicyIngressRule{{ + From: []knet.NetworkPolicyPeer{{ + PodSelector: &metav1.LabelSelector{}, + }}, + }}, + []knet.NetworkPolicyEgressRule{}) + + fakeOvn.startWithDBSetup(initialDB, + &v1.NamespaceList{ + Items: []v1.Namespace{ + namespace1, + }, + }, + &knet.NetworkPolicyList{ + Items: []knet.NetworkPolicy{ + *networkPolicy, + }, + }, + ) + + err := fakeOvn.controller.lsManager.AddSwitch( + nodeName, + getLogicalSwitchUUID(fakeOvn.controller.nbClient, nodeName), + []*net.IPNet{ovntest.MustParseIPNet("10.128.1.0/29")}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + fakeOvn.controller.WatchNamespaces() + fakeOvn.controller.WatchPods() + fakeOvn.controller.WatchNetworkPolicy() + + // Start a pod + completedPod, err := fakeOvn.fakeClient.KubeClient.CoreV1().Pods(namespace1.Name). + Create( + context.TODO(), + newPod(namespace1.Name, "completed-pod", nodeName, "10.128.1.3"), + metav1.CreateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + eventuallyExpectAddressSetsWithIP(fakeOvn, networkPolicy, "10.128.1.3") + + // Consume the entire node ip pool + fakeOvn.controller.lsManager.AllocateUntilFull(nodeName) + + // Mark the pod as Completed, so the "10.128.1.3" moves back to the allocatable pool + completedPod.Status.Phase = kapi.PodSucceeded + completedPod, err = fakeOvn.fakeClient.KubeClient.CoreV1().Pods(completedPod.Namespace). + Update(context.TODO(), completedPod, metav1.UpdateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + eventuallyExpectEmptyAddressSetsExist(fakeOvn, networkPolicy) + + // Spawn a pod with an IP address that collides with a completed pod + _, err = fakeOvn.fakeClient.KubeClient.CoreV1().Pods(namespace1.Name). + Create( + context.TODO(), + newPod(namespace1.Name, "running-pod", nodeName, "10.128.1.3"), + metav1.CreateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + eventuallyExpectAddressSetsWithIP(fakeOvn, networkPolicy, "10.128.1.3") + + // Simulate garbage collector: deletes all completed pods + err = fakeOvn.fakeClient.KubeClient.CoreV1().Pods(completedPod.Namespace).Delete(context.TODO(), completedPod.Name, *metav1.NewDeleteOptions(0)) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + // Wait for every controller to be have finished its work + time.Sleep(200 * time.Millisecond) + + // Running pod policy should not be affected by pod deletions + eventuallyExpectAddressSetsWithIP(fakeOvn, networkPolicy, "10.128.1.3") + + return nil + } + + err := app.Run([]string{app.Name}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + }) + ginkgo.It("reconciles a deleted pod referenced by a networkpolicy in another namespace", func() { app.Action = func(ctx *cli.Context) error { namespace1 := *newNamespace(namespaceName1)