Skip to content

Commit

Permalink
EIP OVN controller: add sync for GW pkt mark LRPs
Browse files Browse the repository at this point in the history
Remove stale GW pkt mark LRPs and add any required.

Units tests in previous commit for UDN l3. See sync
tests.

Signed-off-by: Martin Kennelly <mkennell@redhat.com>
  • Loading branch information
martinkennelly committed Jul 29, 2024
1 parent 1da7aeb commit 1cc65e7
Showing 1 changed file with 204 additions and 4 deletions.
208 changes: 204 additions & 4 deletions go-controller/pkg/ovn/base_network_controller_egressip.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,18 @@ func getEgressIPLRPReRouteDbIDs(egressIPName, podNamespace, podName string, ipFa
})
}

func getEIPLRPObjK8MetaData(externalIDs map[string]string) (string, string) {
objMetaDataRaw := externalIDs[libovsdbops.ObjectNameKey.String()]
if objMetaDataRaw == "" || !strings.Contains(objMetaDataRaw, "_") || !strings.Contains(objMetaDataRaw, "/") {
return "", ""
}
objMetaDataSplit := strings.Split(objMetaDataRaw, "_")
if len(objMetaDataSplit) != 2 {
return "", ""
}
return objMetaDataSplit[0], objMetaDataSplit[1] // EgressIP name and "podNamespace/podName"
}

func getEgressIPLRPNoReRoutePodToJoinDbIDs(ipFamily egressIPFamilyValue, controller string) *libovsdbops.DbObjectIDs {
return libovsdbops.NewDbObjectIDs(libovsdbops.LogicalRouterPolicyEgressIP, controller, map[libovsdbops.ExternalIDKey]string{
libovsdbops.ObjectNameKey: string(NoReRoutePodToJoin),
Expand Down Expand Up @@ -982,6 +994,8 @@ type egressIPCacheEntry struct {
// to this zone which are serving this egressIP object..
// This will help sync SNATs
egressLocalNodes sets.Set[string]
// packet mark for primary secondary networks
mark string
}

func (bnc *BaseNetworkController) syncEgressIPs(namespaces []interface{}) error {
Expand Down Expand Up @@ -1020,6 +1034,10 @@ func (bnc *BaseNetworkController) syncEgressIPs(namespaces []interface{}) error
if err = bnc.syncStaleAddressSetIPs(egressIPCache); err != nil {
return fmt.Errorf("syncEgressIPs unable to reset stale address IPs: %v", err)
}
if err := bnc.syncStaleGWMarkRules(egressIPCache); err != nil {
return fmt.Errorf("syncEgressIPs unable to sync GW packet mark rules: %v", err)
}

return nil
}

Expand Down Expand Up @@ -1064,6 +1082,172 @@ func (bnc *BaseNetworkController) syncStaleAddressSetIPs(egressIPCache map[strin
return nil
}

// syncStaleGWMarkRules removes stale or invalid LRP that packet mark. They are attached to egress nodes gateway router.
// It adds expected LRPs that packet mark.
func (bnc *BaseNetworkController) syncStaleGWMarkRules(egressIPCache map[string]egressIPCacheEntry) error {
// Delete all stale LRPs then add missing LRPs
// This func assumes one node per zone. It determines if an LRP is a valid local LRP. It doesn't determine if the
// LRP is attached to the correct GW router
if !util.IsNetworkSegmentationSupportEnabled() || !config.OVNKubernetesFeature.EnableInterconnect || bnc.IsDefault() {
return nil
}
invalidLRPPredicate := func(item *nbdb.LogicalRouterPolicy) bool {
if item.Priority != types.EgressIPSNATMarkPriority || item.Action != nbdb.LogicalRouterPolicyActionAllow {
return false
}
// skip if owned by another controller
if item.ExternalIDs[libovsdbops.OwnerControllerKey.String()] != bnc.controllerName {
return false
}
eIPName, podNamespaceName := getEIPLRPObjK8MetaData(item.ExternalIDs)
if eIPName == "" || podNamespaceName == "" {
klog.Errorf("Sync stale SNAT Mark rules: unable to process logical router policy because invalid meta data")
return true
}
cacheEntry, exists := egressIPCache[eIPName]
// if EgressIP doesn't exist, its stale
if !exists {
return true
}
// if theres no local egress nodes, the LRP must be invalid
if cacheEntry.egressLocalNodes.Len() == 0 {
return true
}
ipsLocal, okLocal := cacheEntry.egressLocalPods[podNamespaceName]
ipsRemote, okRemote := cacheEntry.egressRemotePods[podNamespaceName]
// if pod doesn't exist locally or remote, its stale
if !okLocal && !okRemote {
return true
}
var ips sets.Set[string]
if okLocal {
ips = ipsLocal
}
if okRemote {
ips = ipsRemote
}
podIP := getPodIPFromEIPSNATMarkMatch(item.Match)
if podIP == "" {
// invalid match
return true
}
if !ips.Has(podIP) {
return true
}
// FIXME: not multi node per zone aware. Doesn't try to find out if the LRP is on the correct nodes GW router
pktMarkValue, ok := item.Options["pkt_mark"]
if !ok || cacheEntry.mark != "" && pktMarkValue != cacheEntry.mark {
return true
}
return false
}
invalidLRPs, err := libovsdbops.FindLogicalRouterPoliciesWithPredicate(bnc.nbClient, invalidLRPPredicate)
if err != nil {
return fmt.Errorf("unable to retrieve invalid SNAT mark logical router polices: %v", err)
}
if len(invalidLRPs) == 0 {
return nil
}
// gather UUIDs of invalid LRPs
invalidLRPUUIDs := sets.New[string]()
for _, invalidLRP := range invalidLRPs {
invalidLRPUUIDs.Insert(invalidLRP.UUID)
}
// gather local node names
localNodeNames := make([]string, 0, 1)
bnc.localZoneNodes.Range(func(nodeName, _ any) bool {
localNodeNames = append(localNodeNames, nodeName.(string))
return true
})
invalidLRPPredicate = func(item *nbdb.LogicalRouterPolicy) bool {
if invalidLRPUUIDs.Has(item.UUID) {
return true
}
return false
}
for _, nodeName := range localNodeNames {
routerName := bnc.GetNetworkScopedGWRouterName(nodeName)
lrps, err := libovsdbops.FindALogicalRouterPoliciesWithPredicate(bnc.nbClient, routerName, invalidLRPPredicate)
if err != nil {
if errors.Is(err, libovsdbclient.ErrNotFound) {
continue
}
return fmt.Errorf("failed to find gateway routers (%s) invalid logical router policies: %v", routerName, err)
}
if err = libovsdbops.DeleteLogicalRouterPolicies(bnc.nbClient, routerName, lrps...); err != nil && !errors.Is(err, libovsdbclient.ErrNotFound) {
return fmt.Errorf("failed to delete gateway routers (%s) invalid logical router policies: %v", routerName, err)
}
}
// ensure all LRPs to mark pods are present
IPv4Mode, IPv6Mode := bnc.IPMode()
isSupportedIP := func(podIP net.IP) bool {
isIPv6 := utilnet.IsIPv6(podIP)
if isIPv6 && IPv6Mode {
return true
}
if !isIPv6 && IPv4Mode {
return true
}
return false
}

processPodFn := func(ops []ovsdb.Operation, eIPName, podKey, mark, routerName string, podIPs sets.Set[string], isEIPIPv6 bool) ([]ovsdb.Operation, error) {
podNamespace, podName := getPodNamespaceAndNameFromKey(podKey)
dbIDs := getEgressIPLRPSNATMarkDbIDs(eIPName, podNamespace, podName, getEIPIPFamily(isEIPIPv6), bnc.controllerName)
for _, podIPStr := range podIPs.UnsortedList() {
podIP := net.ParseIP(podIPStr)
if podIP == nil || utilnet.IsIPv6(podIP) != isEIPIPv6 && !isSupportedIP(podIP) {
continue
}
lrp := nbdb.LogicalRouterPolicy{
Match: fmt.Sprintf("%s.src == %s", getEIPIPFamily(isEIPIPv6), podIPStr),
Priority: types.EgressIPSNATMarkPriority,
Action: nbdb.LogicalRouterPolicyActionAllow,
ExternalIDs: dbIDs.GetExternalIDs(),
Options: map[string]string{"pkt_mark": mark},
}
p := libovsdbops.GetPredicate[*nbdb.LogicalRouterPolicy](dbIDs, nil)
ops, err = libovsdbops.CreateOrUpdateLogicalRouterPolicyWithPredicateOps(bnc.nbClient, ops, routerName, &lrp, p)
if err != nil {
return ops, fmt.Errorf("error creating logical router policy %+v create/update ops for packet marking on router %s: %v", lrp, routerName, err)
}
}
return ops, nil
}

var ops []ovsdb.Operation
for eIPName, cache := range egressIPCache {
if cache.mark == "" {
continue
}
for eIP, nodeName := range cache.egressIPs {
if !cache.egressLocalNodes.Has(nodeName) {
continue
}
routerName := bnc.GetNetworkScopedGWRouterName(nodeName)
isEIPIPv6 := utilnet.IsIPv6String(eIP)
for podKey, podIPs := range cache.egressLocalPods {
ops, err = processPodFn(ops, eIPName, podKey, cache.mark, routerName, podIPs, isEIPIPv6)
if err != nil {
return fmt.Errorf("failed to add process local pod pod %s gateway router SNAT mark: %v", podKey, err)
}
}
for podKey, podIPs := range cache.egressRemotePods {
ops, err = processPodFn(ops, eIPName, podKey, cache.mark, routerName, podIPs, isEIPIPv6)
if err != nil {
return fmt.Errorf("failed to add process remote pod pod %s gateway router SNAT mark: %v", podKey, err)
}
}
}
return nil
}
_, err = libovsdbops.TransactAndCheck(bnc.nbClient, ops)
if err != nil {
return fmt.Errorf("error transacting ops %+v: %v", ops, err)
}
return nil
}

// syncPodAssignmentCache rebuilds the internal pod cache used by the egressIP feature.
// We use the existing kapi and ovn-db information to populate oc.eIPC.podAssignment cache for
// all the pods that are managed by egressIPs.
Expand Down Expand Up @@ -1159,12 +1343,11 @@ func (bnc *BaseNetworkController) syncStaleEgressReroutePolicy(egressIPCache map
if item.Priority != types.EgressIPReroutePriority || item.ExternalIDs[libovsdbops.OwnerControllerKey.String()] != bnc.controllerName {
return false
}
objReferences := strings.Split(item.ExternalIDs[libovsdbops.ObjectNameKey.String()], "_")
if len(objReferences) != 2 {
klog.Errorf("Failed to process Logical Router Policy. Unexpected external IDs for object name: %v", objReferences)
egressIPName, _ := getEIPLRPObjK8MetaData(item.ExternalIDs)
if egressIPName == "" {
klog.Errorf("syncStaleEgressReroutePolicy found logical router policy (UUID: %s) with invalid meta data", item.UUID)
return false
}
egressIPName := objReferences[0]
cacheEntry, exists := egressIPCache[egressIPName]
splitMatch := strings.Split(item.Match, " ")
logicalIP := splitMatch[len(splitMatch)-1]
Expand Down Expand Up @@ -1322,12 +1505,17 @@ func (bnc *BaseNetworkController) generateCacheForEgressIP() (map[string]egressI
return nil, err
}
for _, egressIP := range egressIPs {
mark, err := util.ParseEgressIPMark(egressIP.Annotations)
if err != nil {
klog.Errorf("Failed to parse EgressIP %s mark: %v", egressIP.Name, err)
}
egressIPCache[egressIP.Name] = egressIPCacheEntry{
egressLocalPods: make(map[string]sets.Set[string]),
egressRemotePods: make(map[string]sets.Set[string]),
gatewayRouterIPs: sets.New[string](), // can be transit switchIPs for interconnect multizone setup
egressIPs: map[string]string{},
egressLocalNodes: sets.New[string](),
mark: mark.ToString(),
}
for _, status := range egressIP.Status.Items {
var nextHopIP string
Expand Down Expand Up @@ -2704,6 +2892,18 @@ func getEgressIPPktMark(eipName string, annotations map[string]string) util.Egre
return mark
}

func getPodIPFromEIPSNATMarkMatch(match string) string {
//format ${IP family}.src == ${pod IP}
if match == "" {
return ""
}
matchSplit := strings.Split(match, " ")
if len(matchSplit) != 3 {
return ""
}
return matchSplit[2]
}

func getEIPIPFamily(isIPv6 bool) egressIPFamilyValue {
if isIPv6 {
return IPFamilyValueV6
Expand Down

0 comments on commit 1cc65e7

Please sign in to comment.