Skip to content

Commit

Permalink
Support EgressNetworkPolicy in SDN plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
danwinship committed Jul 29, 2016
1 parent 5aaf3cb commit 2f0255e
Show file tree
Hide file tree
Showing 5 changed files with 271 additions and 19 deletions.
158 changes: 157 additions & 1 deletion pkg/sdn/plugin/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ import (

kapi "k8s.io/kubernetes/pkg/api"
kexec "k8s.io/kubernetes/pkg/util/exec"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sysctl"
utilwait "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
)

const (
Expand Down Expand Up @@ -255,7 +258,7 @@ func (plugin *OsdnNode) SetupSDN(localSubnetCIDR, clusterNetworkCIDR, servicesNe
otx.AddFlow("table=5, priority=200, ip, nw_dst=%s, actions=goto_table:7", localSubnetCIDR)
otx.AddFlow("table=5, priority=100, arp, nw_dst=%s, actions=goto_table:8", clusterNetworkCIDR)
otx.AddFlow("table=5, priority=100, ip, nw_dst=%s, actions=goto_table:8", clusterNetworkCIDR)
otx.AddFlow("table=5, priority=0, ip, actions=output:2")
otx.AddFlow("table=5, priority=0, ip, actions=goto_table:9")
otx.AddFlow("table=5, priority=0, arp, actions=drop")

// Table 6: ARP to container, filled in by openshift-sdn-ovs
Expand All @@ -272,6 +275,10 @@ func (plugin *OsdnNode) SetupSDN(localSubnetCIDR, clusterNetworkCIDR, servicesNe
// eg, "table=8, priority=100, ip, nw_dst=${remote_subnet_cidr}, actions=move:NXM_NX_REG0[]->NXM_NX_TUN_ID[0..31], set_field:${remote_node_ip}->tun_dst,output:1"
otx.AddFlow("table=8, priority=0, actions=drop")

// Table 9: egress network policy dispatch; edited by updateEgressNetworkPolicy()
// eg, "table=9, reg0=${tenant_id}, priority=2, ip, nw_dst=${external_cidr}, actions=drop
otx.AddFlow("table=9, priority=0, actions=output:2")

err = otx.EndTransaction()
if err != nil {
return false, err
Expand Down Expand Up @@ -330,6 +337,155 @@ func (plugin *OsdnNode) SetupSDN(localSubnetCIDR, clusterNetworkCIDR, servicesNe
return true, nil
}

func (plugin *OsdnNode) SetupEgressNetworkPolicy() error {
policies, err := plugin.registry.GetEgressNetworkPolicies()
if err != nil {
return fmt.Errorf("Could not get EgressNetworkPolicies: %s", err)
}

for _, policy := range policies {
vnid, err := plugin.vnids.GetVNID(policy.Namespace)
if err != nil {
glog.Warningf("Could not find netid for namespace %q: %v", policy.Namespace, err)
continue
}
plugin.egressPolicies[vnid] = append(plugin.egressPolicies[vnid], &policy)
}

for vnid := range plugin.egressPolicies {
err := plugin.updateEgressNetworkPolicy(vnid)
if err != nil {
return err
}
}

go utilwait.Forever(plugin.watchEgressNetworkPolicies, 0)
return nil
}

func (plugin *OsdnNode) watchEgressNetworkPolicies() {
eventQueue := plugin.registry.RunEventQueue(EgressNetworkPolicies)

for {
eventType, obj, err := eventQueue.Pop()
if err != nil {
utilruntime.HandleError(fmt.Errorf("EventQueue failed for EgressNetworkPolicy: %v", err))
return
}
policy := obj.(*osapi.EgressNetworkPolicy)

vnid, err := plugin.vnids.GetVNID(policy.Namespace)
if err != nil {
glog.Warningf("Could not find netid for namespace %q: %v", policy.Namespace, err)
continue
}

policies := plugin.egressPolicies[vnid]
for i, oldPolicy := range policies {
if oldPolicy.UID == policy.UID {
policies = append(policies[:i], policies[i+1:]...)
break
}
}
if eventType != watch.Deleted && len(policy.Spec.Egress) > 0 {
policies = append(policies, policy)
}
plugin.egressPolicies[vnid] = policies

err = plugin.updateEgressNetworkPolicy(vnid)
if err != nil {
utilruntime.HandleError(err)
return
}
}
}

func (plugin *OsdnNode) UpdateEgressNetworkPolicyVNID(namespace string, oldVnid, newVnid uint32) error {
var policy *osapi.EgressNetworkPolicy

policies := plugin.egressPolicies[oldVnid]
for i, oldPolicy := range policies {
if oldPolicy.Namespace == namespace {
policy = oldPolicy
plugin.egressPolicies[oldVnid] = append(policies[:i], policies[i+1:]...)
err := plugin.updateEgressNetworkPolicy(oldVnid)
if err != nil {
return err
}
break
}
}

if policy != nil {
plugin.egressPolicies[newVnid] = append(plugin.egressPolicies[newVnid], policy)
err := plugin.updateEgressNetworkPolicy(newVnid)
if err != nil {
return err
}
}

return nil
}

func policyNames(policies []*osapi.EgressNetworkPolicy) string {
names := make([]string, len(policies))
for i, policy := range policies {
names[i] = policy.Namespace + ":" + policy.Name
}
return strings.Join(names, ", ")
}

func (plugin *OsdnNode) updateEgressNetworkPolicy(vnid uint32) error {
otx := ovs.NewTransaction(kexec.New(), BR)

policies := plugin.egressPolicies[vnid]
namespaces := plugin.vnids.GetNamespaces(vnid)
if len(policies) == 0 {
otx.DeleteFlows("table=9, reg0=%d", vnid)
} else if vnid == 0 {
glog.Errorf("EgressNetworkPolicy in global network namespace is not allowed (%s); ignoring", policyNames(policies))
} else if len(namespaces) > 1 {
glog.Errorf("EgressNetworkPolicy not allowed in shared NetNamespace (%s); dropping all traffic", strings.Join(namespaces, ", "))
otx.DeleteFlows("table=9, reg0=%d", vnid)
otx.AddFlow("table=9, reg0=%d, priority=1, actions=drop", vnid)
} else if len(policies) > 1 {
glog.Errorf("multiple EgressNetworkPolicies in same network namespace (%s) is not allowed; dropping all traffic", policyNames(policies))
otx.DeleteFlows("table=9, reg0=%d", vnid)
otx.AddFlow("table=9, reg0=%d, priority=1, actions=drop", vnid)
} else /* vnid != 0 && len(policies) == 1 */ {
// Temporarily drop all outgoing traffic, to avoid race conditions while modifying the other rules
otx.AddFlow("table=9, reg0=%d, cookie=1, priority=65535, actions=drop", vnid)
otx.DeleteFlows("table=9, reg0=%d, cookie=0/1", vnid)

for i, rule := range policies[0].Spec.Egress {
priority := len(policies[0].Spec.Egress) - i

var action string
if rule.Type == osapi.EgressNetworkPolicyRuleAllow {
action = "output:2"
} else {
action = "drop"
}

var dst string
if rule.To.CIDRSelector == "0.0.0.0/32" {
dst = ""
} else {
dst = fmt.Sprintf(", nw_dst=%s", rule.To.CIDRSelector)
}

otx.AddFlow("table=9, reg0=%d, priority=%d, ip%s, actions=%s", vnid, priority, dst, action)
}
otx.DeleteFlows("table=9, reg0=%d, cookie=1/1", vnid)
}

err := otx.EndTransaction()
if err != nil {
return fmt.Errorf("Error updating OVS flows for EgressNetworkPolicy: %v", err)
}
return nil
}

func (plugin *OsdnNode) AddHostSubnetRules(subnet *osapi.HostSubnet) error {
glog.Infof("AddHostSubnetRules for %s", hostSubnetToString(subnet))
otx := ovs.NewTransaction(kexec.New(), BR)
Expand Down
5 changes: 5 additions & 0 deletions pkg/sdn/plugin/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type OsdnNode struct {
vnids *vnidMap
iptablesSyncPeriod time.Duration
mtu uint32
egressPolicies map[uint32][]*osapi.EgressNetworkPolicy
}

// Called by higher layers to create the plugin SDN node instance
Expand Down Expand Up @@ -71,6 +72,7 @@ func NewNodePlugin(pluginName string, osClient *osclient.Client, kClient *kclien
podNetworkReady: make(chan struct{}),
iptablesSyncPeriod: iptablesSyncPeriod,
mtu: mtu,
egressPolicies: make(map[uint32][]*osapi.EgressNetworkPolicy),
}
return plugin, nil
}
Expand All @@ -96,6 +98,9 @@ func (node *OsdnNode) Start() error {
if err = node.VnidStartNode(); err != nil {
return err
}
if err = node.SetupEgressNetworkPolicy(); err != nil {
return err
}
}

if networkChanged {
Expand Down
71 changes: 69 additions & 2 deletions pkg/sdn/plugin/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/golang/glog"

osclient "github.com/openshift/origin/pkg/client"
osapi "github.com/openshift/origin/pkg/sdn/api"
"github.com/openshift/origin/pkg/sdn/plugin/api"

kapi "k8s.io/kubernetes/pkg/api"
Expand All @@ -19,10 +20,16 @@ import (
"k8s.io/kubernetes/pkg/watch"
)

type proxyFirewallItem struct {
policy osapi.EgressNetworkPolicyRuleType
net *net.IPNet
}

type ovsProxyPlugin struct {
registry *Registry
podsByIP map[string]*kapi.Pod
podsMutex sync.Mutex
firewall map[string][]proxyFirewallItem

baseEndpointsHandler pconfig.EndpointsConfigHandler
}
Expand All @@ -36,6 +43,7 @@ func NewProxyPlugin(pluginName string, osClient *osclient.Client, kClient *kclie
return &ovsProxyPlugin{
registry: newRegistry(osClient, kClient),
podsByIP: make(map[string]*kapi.Pod),
firewall: make(map[string][]proxyFirewallItem),
}, nil
}

Expand All @@ -50,15 +58,61 @@ func (proxy *ovsProxyPlugin) Start(baseHandler pconfig.EndpointsConfigHandler) e
return err
}

policies, err := proxy.registry.GetEgressNetworkPolicies()
if err != nil {
return fmt.Errorf("Could not get EgressNetworkPolicies: %s", err)
}
for _, policy := range policies {
proxy.updateNetworkPolicy(policy)
}

for _, pod := range pods {
proxy.trackPod(&pod)
}

go utilwait.Forever(proxy.watchPods, 0)
go utilwait.Forever(proxy.watchEgressNetworkPolicies, 0)

return nil
}

func (proxy *ovsProxyPlugin) watchEgressNetworkPolicies() {
eventQueue := proxy.registry.RunEventQueue(EgressNetworkPolicies)

for {
eventType, obj, err := eventQueue.Pop()
if err != nil {
utilruntime.HandleError(fmt.Errorf("EventQueue failed for EgressNetworkPolicy: %v", err))
return
}
policy := obj.(*osapi.EgressNetworkPolicy)
if eventType == watch.Deleted {
policy.Spec.Egress = nil
}
proxy.updateNetworkPolicy(*policy)
// FIXME: poke the endpoint-syncer somehow...
}
}

func (proxy *ovsProxyPlugin) updateNetworkPolicy(policy osapi.EgressNetworkPolicy) {
firewall := make([]proxyFirewallItem, len(policy.Spec.Egress))
for i, rule := range policy.Spec.Egress {
_, cidr, err := net.ParseCIDR(rule.To.CIDRSelector)
if err != nil {
// should have been caught by validation
glog.Errorf("Illegal CIDR value %q in EgressNetworkPolicy rule", rule.To.CIDRSelector)
return
}
firewall[i] = proxyFirewallItem{rule.Type, cidr}
}

if len(firewall) > 0 {
proxy.firewall[policy.Namespace] = firewall
} else {
delete(proxy.firewall, policy.Namespace)
}
}

func (proxy *ovsProxyPlugin) watchPods() {
eventQueue := proxy.registry.RunEventQueue(Pods)

Expand Down Expand Up @@ -127,6 +181,15 @@ func (proxy *ovsProxyPlugin) unTrackPod(pod *kapi.Pod) {
}
}

func (proxy *ovsProxyPlugin) firewallBlocksIP(namespace string, ip net.IP) bool {
for _, item := range proxy.firewall[namespace] {
if item.net.Contains(ip) {
return item.policy == osapi.EgressNetworkPolicyRuleDeny
}
}
return false
}

func (proxy *ovsProxyPlugin) OnEndpointsUpdate(allEndpoints []kapi.Endpoints) {
ni, err := proxy.registry.GetNetworkInfo()
if err != nil {
Expand All @@ -145,8 +208,7 @@ EndpointLoop:
if ni.ServiceNetwork.Contains(IP) {
glog.Warningf("Service '%s' in namespace '%s' has an Endpoint inside the service network (%s)", ep.ObjectMeta.Name, ns, addr.IP)
continue EndpointLoop
}
if ni.ClusterNetwork.Contains(IP) {
} else if ni.ClusterNetwork.Contains(IP) {
podInfo, ok := proxy.getTrackedPod(addr.IP)
if !ok {
glog.Warningf("Service '%s' in namespace '%s' has an Endpoint pointing to non-existent pod (%s)", ep.ObjectMeta.Name, ns, addr.IP)
Expand All @@ -156,6 +218,11 @@ EndpointLoop:
glog.Warningf("Service '%s' in namespace '%s' has an Endpoint pointing to pod %s in namespace '%s'", ep.ObjectMeta.Name, ns, addr.IP, podInfo.ObjectMeta.Namespace)
continue EndpointLoop
}
} else {
if proxy.firewallBlocksIP(ns, IP) {
glog.Warningf("Service '%s' in namespace '%s' has an Endpoint pointing to firewalled destination (%s)", ep.ObjectMeta.Name, ns, addr.IP)
continue EndpointLoop
}
}
}
}
Expand Down
24 changes: 18 additions & 6 deletions pkg/sdn/plugin/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ type Registry struct {
type ResourceName string

const (
Nodes ResourceName = "Nodes"
Namespaces ResourceName = "Namespaces"
NetNamespaces ResourceName = "NetNamespaces"
Services ResourceName = "Services"
HostSubnets ResourceName = "HostSubnets"
Pods ResourceName = "Pods"
Nodes ResourceName = "Nodes"
Namespaces ResourceName = "Namespaces"
NetNamespaces ResourceName = "NetNamespaces"
Services ResourceName = "Services"
HostSubnets ResourceName = "HostSubnets"
Pods ResourceName = "Pods"
EgressNetworkPolicies ResourceName = "EgressNetworkPolicies"
)

func newRegistry(osClient *osclient.Client, kClient *kclient.Client) *Registry {
Expand Down Expand Up @@ -259,6 +260,14 @@ func (registry *Registry) getServices(namespace string) ([]kapi.Service, error)
return servList, nil
}

func (registry *Registry) GetEgressNetworkPolicies() ([]osapi.EgressNetworkPolicy, error) {
policyList, err := registry.oClient.EgressNetworkPolicies(kapi.NamespaceAll).List(kapi.ListOptions{})
if err != nil {
return nil, err
}
return policyList.Items, nil
}

// Run event queue for the given resource
func (registry *Registry) RunEventQueue(resourceName ResourceName) *oscache.EventQueue {
var client cache.Getter
Expand All @@ -283,6 +292,9 @@ func (registry *Registry) RunEventQueue(resourceName ResourceName) *oscache.Even
case Pods:
expectedType = &kapi.Pod{}
client = registry.kClient
case EgressNetworkPolicies:
expectedType = &osapi.EgressNetworkPolicy{}
client = registry.oClient
default:
log.Fatalf("Unknown resource %s during initialization of event queue", resourceName)
}
Expand Down
Loading

0 comments on commit 2f0255e

Please sign in to comment.