diff --git a/go-controller/pkg/ovn/base_network_controller_egressip.go b/go-controller/pkg/ovn/base_network_controller_egressip.go index 0c9ea4096be..5849df1a369 100644 --- a/go-controller/pkg/ovn/base_network_controller_egressip.go +++ b/go-controller/pkg/ovn/base_network_controller_egressip.go @@ -73,6 +73,18 @@ func getEgressIPLRPReRouteDbIDs(egressIPName, podNamespace, podName string, ipFa }) } +func getEIPLRPObjK8MetaData(externalIDs map[string]string) (string, string) { + objMetaDataRaw := externalIDs[libovsdbops.ObjectNameKey.String()] + if objMetaDataRaw == "" || !strings.Contains(objMetaDataRaw, "_") || !strings.Contains(objMetaDataRaw, "/") { + return "", "" + } + objMetaDataSplit := strings.Split(objMetaDataRaw, "_") + if len(objMetaDataSplit) != 2 { + return "", "" + } + return objMetaDataSplit[0], objMetaDataSplit[1] // EgressIP name and "podNamespace/podName" +} + func getEgressIPLRPNoReRoutePodToJoinDbIDs(ipFamily egressIPFamilyValue, controller string) *libovsdbops.DbObjectIDs { return libovsdbops.NewDbObjectIDs(libovsdbops.LogicalRouterPolicyEgressIP, controller, map[libovsdbops.ExternalIDKey]string{ libovsdbops.ObjectNameKey: string(NoReRoutePodToJoin), @@ -982,6 +994,8 @@ type egressIPCacheEntry struct { // to this zone which are serving this egressIP object.. // This will help sync SNATs egressLocalNodes sets.Set[string] + // packet mark for primary secondary networks + mark string } func (bnc *BaseNetworkController) syncEgressIPs(namespaces []interface{}) error { @@ -1020,6 +1034,10 @@ func (bnc *BaseNetworkController) syncEgressIPs(namespaces []interface{}) error if err = bnc.syncStaleAddressSetIPs(egressIPCache); err != nil { return fmt.Errorf("syncEgressIPs unable to reset stale address IPs: %v", err) } + if err := bnc.syncStaleGWMarkRules(egressIPCache); err != nil { + return fmt.Errorf("syncEgressIPs unable to sync GW packet mark rules: %v", err) + } + return nil } @@ -1064,6 +1082,172 @@ func (bnc *BaseNetworkController) syncStaleAddressSetIPs(egressIPCache map[strin return nil } +// syncStaleGWMarkRules removes stale or invalid LRP that packet mark. They are attached to egress nodes gateway router. +// It adds expected LRPs that packet mark. +func (bnc *BaseNetworkController) syncStaleGWMarkRules(egressIPCache map[string]egressIPCacheEntry) error { + // Delete all stale LRPs then add missing LRPs + // This func assumes one node per zone. It determines if an LRP is a valid local LRP. It doesn't determine if the + // LRP is attached to the correct GW router + if !util.IsNetworkSegmentationSupportEnabled() || !config.OVNKubernetesFeature.EnableInterconnect || bnc.IsDefault() { + return nil + } + invalidLRPPredicate := func(item *nbdb.LogicalRouterPolicy) bool { + if item.Priority != types.EgressIPSNATMarkPriority || item.Action != nbdb.LogicalRouterPolicyActionAllow { + return false + } + // skip if owned by another controller + if item.ExternalIDs[libovsdbops.OwnerControllerKey.String()] != bnc.controllerName { + return false + } + eIPName, podNamespaceName := getEIPLRPObjK8MetaData(item.ExternalIDs) + if eIPName == "" || podNamespaceName == "" { + klog.Errorf("Sync stale SNAT Mark rules: unable to process logical router policy because invalid meta data") + return true + } + cacheEntry, exists := egressIPCache[eIPName] + // if EgressIP doesn't exist, its stale + if !exists { + return true + } + // if theres no local egress nodes, the LRP must be invalid + if cacheEntry.egressLocalNodes.Len() == 0 { + return true + } + ipsLocal, okLocal := cacheEntry.egressLocalPods[podNamespaceName] + ipsRemote, okRemote := cacheEntry.egressRemotePods[podNamespaceName] + // if pod doesn't exist locally or remote, its stale + if !okLocal && !okRemote { + return true + } + var ips sets.Set[string] + if okLocal { + ips = ipsLocal + } + if okRemote { + ips = ipsRemote + } + podIP := getPodIPFromEIPSNATMarkMatch(item.Match) + if podIP == "" { + // invalid match + return true + } + if !ips.Has(podIP) { + return true + } + // FIXME: not multi node per zone aware. Doesn't try to find out if the LRP is on the correct nodes GW router + pktMarkValue, ok := item.Options["pkt_mark"] + if !ok || cacheEntry.mark != "" && pktMarkValue != cacheEntry.mark { + return true + } + return false + } + invalidLRPs, err := libovsdbops.FindLogicalRouterPoliciesWithPredicate(bnc.nbClient, invalidLRPPredicate) + if err != nil { + return fmt.Errorf("unable to retrieve invalid SNAT mark logical router polices: %v", err) + } + if len(invalidLRPs) == 0 { + return nil + } + // gather UUIDs of invalid LRPs + invalidLRPUUIDs := sets.New[string]() + for _, invalidLRP := range invalidLRPs { + invalidLRPUUIDs.Insert(invalidLRP.UUID) + } + // gather local node names + localNodeNames := make([]string, 0, 1) + bnc.localZoneNodes.Range(func(nodeName, _ any) bool { + localNodeNames = append(localNodeNames, nodeName.(string)) + return true + }) + invalidLRPPredicate = func(item *nbdb.LogicalRouterPolicy) bool { + if invalidLRPUUIDs.Has(item.UUID) { + return true + } + return false + } + for _, nodeName := range localNodeNames { + routerName := bnc.GetNetworkScopedGWRouterName(nodeName) + lrps, err := libovsdbops.FindALogicalRouterPoliciesWithPredicate(bnc.nbClient, routerName, invalidLRPPredicate) + if err != nil { + if errors.Is(err, libovsdbclient.ErrNotFound) { + continue + } + return fmt.Errorf("failed to find gateway routers (%s) invalid logical router policies: %v", routerName, err) + } + if err = libovsdbops.DeleteLogicalRouterPolicies(bnc.nbClient, routerName, lrps...); err != nil && !errors.Is(err, libovsdbclient.ErrNotFound) { + return fmt.Errorf("failed to delete gateway routers (%s) invalid logical router policies: %v", routerName, err) + } + } + // ensure all LRPs to mark pods are present + IPv4Mode, IPv6Mode := bnc.IPMode() + isSupportedIP := func(podIP net.IP) bool { + isIPv6 := utilnet.IsIPv6(podIP) + if isIPv6 && IPv6Mode { + return true + } + if !isIPv6 && IPv4Mode { + return true + } + return false + } + + processPodFn := func(ops []ovsdb.Operation, eIPName, podKey, mark, routerName string, podIPs sets.Set[string], isEIPIPv6 bool) ([]ovsdb.Operation, error) { + podNamespace, podName := getPodNamespaceAndNameFromKey(podKey) + dbIDs := getEgressIPLRPSNATMarkDbIDs(eIPName, podNamespace, podName, getEIPIPFamily(isEIPIPv6), bnc.controllerName) + for _, podIPStr := range podIPs.UnsortedList() { + podIP := net.ParseIP(podIPStr) + if podIP == nil || utilnet.IsIPv6(podIP) != isEIPIPv6 && !isSupportedIP(podIP) { + continue + } + lrp := nbdb.LogicalRouterPolicy{ + Match: fmt.Sprintf("%s.src == %s", getEIPIPFamily(isEIPIPv6), podIPStr), + Priority: types.EgressIPSNATMarkPriority, + Action: nbdb.LogicalRouterPolicyActionAllow, + ExternalIDs: dbIDs.GetExternalIDs(), + Options: map[string]string{"pkt_mark": mark}, + } + p := libovsdbops.GetPredicate[*nbdb.LogicalRouterPolicy](dbIDs, nil) + ops, err = libovsdbops.CreateOrUpdateLogicalRouterPolicyWithPredicateOps(bnc.nbClient, ops, routerName, &lrp, p) + if err != nil { + return ops, fmt.Errorf("error creating logical router policy %+v create/update ops for packet marking on router %s: %v", lrp, routerName, err) + } + } + return ops, nil + } + + var ops []ovsdb.Operation + for eIPName, cache := range egressIPCache { + if cache.mark == "" { + continue + } + for eIP, nodeName := range cache.egressIPs { + if !cache.egressLocalNodes.Has(nodeName) { + continue + } + routerName := bnc.GetNetworkScopedGWRouterName(nodeName) + isEIPIPv6 := utilnet.IsIPv6String(eIP) + for podKey, podIPs := range cache.egressLocalPods { + ops, err = processPodFn(ops, eIPName, podKey, cache.mark, routerName, podIPs, isEIPIPv6) + if err != nil { + return fmt.Errorf("failed to add process local pod pod %s gateway router SNAT mark: %v", podKey, err) + } + } + for podKey, podIPs := range cache.egressRemotePods { + ops, err = processPodFn(ops, eIPName, podKey, cache.mark, routerName, podIPs, isEIPIPv6) + if err != nil { + return fmt.Errorf("failed to add process remote pod pod %s gateway router SNAT mark: %v", podKey, err) + } + } + } + return nil + } + _, err = libovsdbops.TransactAndCheck(bnc.nbClient, ops) + if err != nil { + return fmt.Errorf("error transacting ops %+v: %v", ops, err) + } + return nil +} + // syncPodAssignmentCache rebuilds the internal pod cache used by the egressIP feature. // We use the existing kapi and ovn-db information to populate oc.eIPC.podAssignment cache for // all the pods that are managed by egressIPs. @@ -1159,12 +1343,11 @@ func (bnc *BaseNetworkController) syncStaleEgressReroutePolicy(egressIPCache map if item.Priority != types.EgressIPReroutePriority || item.ExternalIDs[libovsdbops.OwnerControllerKey.String()] != bnc.controllerName { return false } - objReferences := strings.Split(item.ExternalIDs[libovsdbops.ObjectNameKey.String()], "_") - if len(objReferences) != 2 { - klog.Errorf("Failed to process Logical Router Policy. Unexpected external IDs for object name: %v", objReferences) + egressIPName, _ := getEIPLRPObjK8MetaData(item.ExternalIDs) + if egressIPName == "" { + klog.Errorf("syncStaleEgressReroutePolicy found logical router policy (UUID: %s) with invalid meta data", item.UUID) return false } - egressIPName := objReferences[0] cacheEntry, exists := egressIPCache[egressIPName] splitMatch := strings.Split(item.Match, " ") logicalIP := splitMatch[len(splitMatch)-1] @@ -1322,12 +1505,17 @@ func (bnc *BaseNetworkController) generateCacheForEgressIP() (map[string]egressI return nil, err } for _, egressIP := range egressIPs { + mark, err := util.ParseEgressIPMark(egressIP.Annotations) + if err != nil { + klog.Errorf("Failed to parse EgressIP %s mark: %v", egressIP.Name, err) + } egressIPCache[egressIP.Name] = egressIPCacheEntry{ egressLocalPods: make(map[string]sets.Set[string]), egressRemotePods: make(map[string]sets.Set[string]), gatewayRouterIPs: sets.New[string](), // can be transit switchIPs for interconnect multizone setup egressIPs: map[string]string{}, egressLocalNodes: sets.New[string](), + mark: mark.ToString(), } for _, status := range egressIP.Status.Items { var nextHopIP string @@ -2704,6 +2892,18 @@ func getEgressIPPktMark(eipName string, annotations map[string]string) util.Egre return mark } +func getPodIPFromEIPSNATMarkMatch(match string) string { + //format ${IP family}.src == ${pod IP} + if match == "" { + return "" + } + matchSplit := strings.Split(match, " ") + if len(matchSplit) != 3 { + return "" + } + return matchSplit[2] +} + func getEIPIPFamily(isIPv6 bool) egressIPFamilyValue { if isIPv6 { return IPFamilyValueV6