Skip to content

Commit

Permalink
Merge pull request ovn-org#1567 from zeeke/ocpbugs-5889-backup-413
Browse files Browse the repository at this point in the history
OCPBUGS-8741: [release-4.13] Handle Completed pods deletion
  • Loading branch information
openshift-merge-robot committed Mar 14, 2023
2 parents 3521f68 + eee6846 commit 117c69f
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 29 deletions.
70 changes: 48 additions & 22 deletions go-controller/pkg/ovn/base_network_controller_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,32 +214,29 @@ func (bnc *BaseNetworkController) deletePodLogicalPort(pod *kapi.Pod, portInfo *
// check to make sure no other pods are using this IP before we try to release it if this is a completed pod.
if util.PodCompleted(pod) {
if shouldRelease, err = bnc.lsManager.ConditionalIPRelease(switchName, podIfAddrs, func() (bool, error) {
pods, err := bnc.watchFactory.GetAllPods()

// Ignore pods on other switches
if expectedSwitchName != switchName {
return true, nil
}

var needleIPs []net.IP
for _, podIPNet := range podIfAddrs {
needleIPs = append(needleIPs, podIPNet.IP)
}

collidingPod, err := bnc.findPodWithIPAddresses(needleIPs)
if err != nil {
return false, fmt.Errorf("unable to get pods to determine if completed pod IP is in use by another pod. "+
"Will not release pod %s/%s IP: %#v from allocator", pod.Namespace, pod.Name, podIfAddrs)
return false, fmt.Errorf("unable to determine if completed pod IP is in use by another pod. "+
"Will not release pod %s/%s IP: %#v from allocator. %v", pod.Namespace, pod.Name, podIfAddrs, err)
}
// iterate through all pods, ignore pods on other switches
for _, p := range pods {
if util.PodCompleted(p) || util.PodWantsHostNetwork(p) || !util.PodScheduled(p) || expectedSwitchName != switchName {
continue
}
// check if the pod addresses match in the OVN annotation
pAddrs, err := util.GetPodIPsOfNetwork(p, bnc.NetInfo)
if err != nil {
continue
}

for _, pAddr := range pAddrs {
for _, podAddr := range podIfAddrs {
if pAddr.Equal(podAddr.IP) {
klog.Infof("Will not release IP address: %s for %s. Detected another pod"+
" using this IP: %s/%s", pAddr.String(), podDesc, p.Namespace, p.Name)
return false, nil
}
}
}
if collidingPod != nil {
klog.Infof("Will not release IP address: %s for %s. Detected another pod"+
" using this IP: %s/%s", util.JoinIPNetIPs(podIfAddrs, " "), podDesc, collidingPod.Namespace, collidingPod.Name)
return false, nil
}

klog.Infof("Releasing IPs for Completed pod: %s/%s, ips: %s", pod.Namespace, pod.Name,
util.JoinIPNetIPs(podIfAddrs, " "))
return true, nil
Expand Down Expand Up @@ -291,6 +288,35 @@ func (bnc *BaseNetworkController) deletePodLogicalPort(pod *kapi.Pod, portInfo *
return &pInfo, nil
}

func (bnc *BaseNetworkController) findPodWithIPAddresses(needleIPs []net.IP) (*kapi.Pod, error) {
allPods, err := bnc.watchFactory.GetAllPods()
if err != nil {
return nil, fmt.Errorf("unable to get pods: %w", err)
}

// iterate through all pods
for _, p := range allPods {
if util.PodCompleted(p) || util.PodWantsHostNetwork(p) || !util.PodScheduled(p) {
continue
}
// check if the pod addresses match in the OVN annotation
haystackPodAddrs, err := util.GetPodIPsOfNetwork(p, bnc.NetInfo)
if err != nil {
continue
}

for _, haystackPodAddr := range haystackPodAddrs {
for _, needleIP := range needleIPs {
if haystackPodAddr.Equal(needleIP) {
return p, nil
}
}
}
}

return nil, nil
}

func (bnc *BaseNetworkController) releasePodIPs(pInfo *lpInfo) error {
if err := bnc.lsManager.ReleaseIPs(pInfo.logicalSwitch, pInfo.ips); err != nil {
if !errors.Is(err, logicalswitchmanager.SwitchNotFound) {
Expand Down
4 changes: 2 additions & 2 deletions go-controller/pkg/ovn/default_network_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -934,15 +934,15 @@ func (h *defaultNetworkControllerEventHandler) DeleteResource(obj, cachedObj int

case factory.PeerPodSelectorType:
extraParameters := h.extraParameters.(*NetworkPolicyExtraParameters)
return h.oc.handlePeerPodSelectorDelete(extraParameters.np, extraParameters.gp, obj)
return h.oc.handlePeerPodSelectorDelete(extraParameters.np, extraParameters.gp, extraParameters.podSelector, obj)

case factory.PeerNamespaceAndPodSelectorType:
extraParameters := h.extraParameters.(*NetworkPolicyExtraParameters)
return h.oc.handlePeerNamespaceAndPodDel(extraParameters.np, extraParameters.gp, obj)

case factory.PeerPodForNamespaceAndPodSelectorType:
extraParameters := h.extraParameters.(*NetworkPolicyExtraParameters)
return h.oc.handlePeerPodSelectorDelete(extraParameters.np, extraParameters.gp, obj)
return h.oc.handlePeerPodSelectorDelete(extraParameters.np, extraParameters.gp, extraParameters.podSelector, obj)

case factory.PeerNamespaceSelectorType:
extraParameters := h.extraParameters.(*NetworkPolicyExtraParameters)
Expand Down
35 changes: 30 additions & 5 deletions go-controller/pkg/ovn/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -1393,7 +1393,7 @@ func (oc *DefaultNetworkController) handlePeerPodSelectorAddUpdate(np *networkPo
// handlePeerPodSelectorDelete removes the IP address of a pod that no longer
// matches a NetworkPolicy ingress/egress section's selectors from that
// ingress/egress address set
func (oc *DefaultNetworkController) handlePeerPodSelectorDelete(np *networkPolicy, gp *gressPolicy, obj interface{}) error {
func (oc *DefaultNetworkController) handlePeerPodSelectorDelete(np *networkPolicy, gp *gressPolicy, podSelector labels.Selector, obj interface{}) error {
np.RLock()
defer np.RUnlock()
if np.deleted {
Expand All @@ -1404,6 +1404,29 @@ func (oc *DefaultNetworkController) handlePeerPodSelectorDelete(np *networkPolic
klog.Infof("Pod %s/%s not scheduled on any node, skipping it", pod.Namespace, pod.Name)
return nil
}

if util.PodCompleted(pod) {
ips, err := util.GetPodIPsOfNetwork(pod, &util.DefaultNetInfo{})
if err != nil {
return fmt.Errorf("can't get pod IPs %s/%s: %w", pod.Namespace, pod.Name, err)
}

collidingPod, err := oc.findPodWithIPAddresses(ips)
if err != nil {
return fmt.Errorf("lookup for pods with the same IPs [%s] failed: %w", util.JoinIPs(ips, " "), err)
}

if collidingPod != nil {

// If the IP is used by another Pod that is targeted by the same network policy, don't remove the IP from the Address_Set
if podSelector.Matches(labels.Set(collidingPod.Labels)) {
klog.Infof("Not deleting Pod %s/%s IPs [%s] as they are used by %s/%s", pod.Namespace, pod.Name,
util.JoinIPs(ips, " "), collidingPod.Namespace, collidingPod.Name)
return nil
}
}
}

// gressPolicy.deletePeerPod must be called with networkPolicy RLock.
if err := gp.deletePeerPod(pod); err != nil {
return err
Expand Down Expand Up @@ -1439,8 +1462,9 @@ func (oc *DefaultNetworkController) addPeerPodHandler(podSelector *metav1.LabelS
factory.PeerPodSelectorType,
syncFunc,
&NetworkPolicyExtraParameters{
np: np,
gp: gp,
np: np,
gp: gp,
podSelector: sel,
})

podHandler, err := retryPeerPods.WatchResourceFiltered(namespace, sel)
Expand Down Expand Up @@ -1477,8 +1501,9 @@ func (oc *DefaultNetworkController) handlePeerNamespaceAndPodAdd(np *networkPoli
factory.PeerPodForNamespaceAndPodSelectorType,
syncFunc,
&NetworkPolicyExtraParameters{
gp: gp,
np: np,
gp: gp,
np: np,
podSelector: podSelector,
},
)
// syncFunc and factory.PeerPodForNamespaceAndPodSelectorType add event handler also take np.RLock,
Expand Down
87 changes: 87 additions & 0 deletions go-controller/pkg/ovn/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util"

kapi "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
knet "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -1535,6 +1536,92 @@ var _ = ginkgo.Describe("OVN NetworkPolicy Operations", func() {
gomega.Expect(err).NotTo(gomega.HaveOccurred())
})

ginkgo.It("reconciles a completed and deleted pod whose IP has been assigned to a running pod", func() {
app.Action = func(ctx *cli.Context) error {
namespace1 := *newNamespace(namespaceName1)
nodeName := "node1"

// Use simple allow-same-namespace network policy
networkPolicy := newNetworkPolicy("networkpolicy1", namespace1.Name,
metav1.LabelSelector{},
[]knet.NetworkPolicyIngressRule{{
From: []knet.NetworkPolicyPeer{{
PodSelector: &metav1.LabelSelector{},
}},
}},
[]knet.NetworkPolicyEgressRule{})

fakeOvn.startWithDBSetup(initialDB,
&v1.NamespaceList{
Items: []v1.Namespace{
namespace1,
},
},
&knet.NetworkPolicyList{
Items: []knet.NetworkPolicy{
*networkPolicy,
},
},
)

err := fakeOvn.controller.lsManager.AddSwitch(
nodeName,
getLogicalSwitchUUID(fakeOvn.controller.nbClient, nodeName),
[]*net.IPNet{ovntest.MustParseIPNet("10.128.1.0/29")})
gomega.Expect(err).NotTo(gomega.HaveOccurred())

fakeOvn.controller.WatchNamespaces()
fakeOvn.controller.WatchPods()
fakeOvn.controller.WatchNetworkPolicy()

// Start a pod
completedPod, err := fakeOvn.fakeClient.KubeClient.CoreV1().Pods(namespace1.Name).
Create(
context.TODO(),
newPod(namespace1.Name, "completed-pod", nodeName, "10.128.1.3"),
metav1.CreateOptions{})
gomega.Expect(err).NotTo(gomega.HaveOccurred())

eventuallyExpectAddressSetsWithIP(fakeOvn, networkPolicy, "10.128.1.3")

// Consume the entire node ip pool
fakeOvn.controller.lsManager.AllocateUntilFull(nodeName)

// Mark the pod as Completed, so the "10.128.1.3" moves back to the allocatable pool
completedPod.Status.Phase = kapi.PodSucceeded
completedPod, err = fakeOvn.fakeClient.KubeClient.CoreV1().Pods(completedPod.Namespace).
Update(context.TODO(), completedPod, metav1.UpdateOptions{})
gomega.Expect(err).NotTo(gomega.HaveOccurred())

eventuallyExpectEmptyAddressSetsExist(fakeOvn, networkPolicy)

// Spawn a pod with an IP address that collides with a completed pod
_, err = fakeOvn.fakeClient.KubeClient.CoreV1().Pods(namespace1.Name).
Create(
context.TODO(),
newPod(namespace1.Name, "running-pod", nodeName, "10.128.1.3"),
metav1.CreateOptions{})
gomega.Expect(err).NotTo(gomega.HaveOccurred())

eventuallyExpectAddressSetsWithIP(fakeOvn, networkPolicy, "10.128.1.3")

// Simulate garbage collector: deletes all completed pods
err = fakeOvn.fakeClient.KubeClient.CoreV1().Pods(completedPod.Namespace).Delete(context.TODO(), completedPod.Name, *metav1.NewDeleteOptions(0))
gomega.Expect(err).NotTo(gomega.HaveOccurred())

// Wait for every controller to be have finished its work
time.Sleep(200 * time.Millisecond)

// Running pod policy should not be affected by pod deletions
eventuallyExpectAddressSetsWithIP(fakeOvn, networkPolicy, "10.128.1.3")

return nil
}

err := app.Run([]string{app.Name})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
})

ginkgo.It("reconciles a deleted pod referenced by a networkpolicy in another namespace", func() {
app.Action = func(ctx *cli.Context) error {
namespace1 := *newNamespace(namespaceName1)
Expand Down

0 comments on commit 117c69f

Please sign in to comment.