Skip to content

Commit

Permalink
Support EgressFirewall in SDN plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
danwinship committed Jul 11, 2016
1 parent 4cc6b66 commit 8a49b98
Show file tree
Hide file tree
Showing 5 changed files with 264 additions and 9 deletions.
158 changes: 157 additions & 1 deletion pkg/sdn/plugin/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@ import (
"github.com/openshift/origin/pkg/util/ovs"

kapi "k8s.io/kubernetes/pkg/api"
kerrors "k8s.io/kubernetes/pkg/api/errors"
kexec "k8s.io/kubernetes/pkg/util/exec"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sysctl"
utilwait "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
)

const (
Expand All @@ -26,6 +30,7 @@ const (
VERSION_TABLE = "table=253"
VERSION_ACTION = "actions=note:"

// OVS-related interfaces
BR = "br0"
LBR = "lbr0"
TUN = "tun0"
Expand All @@ -34,6 +39,10 @@ const (
VXLAN = "vxlan0"

VXLAN_PORT = "4789"

// Egress firewall tables
FW_TABLE_LOW = 10
FW_TABLE_HIGH = 11
)

func getPluginVersion(multitenant bool) []string {
Expand Down Expand Up @@ -255,7 +264,7 @@ func (plugin *OsdnNode) SetupSDN(localSubnetCIDR, clusterNetworkCIDR, servicesNe
otx.AddFlow("table=5, priority=200, ip, nw_dst=%s, actions=goto_table:7", localSubnetCIDR)
otx.AddFlow("table=5, priority=100, arp, nw_dst=%s, actions=goto_table:8", clusterNetworkCIDR)
otx.AddFlow("table=5, priority=100, ip, nw_dst=%s, actions=goto_table:8", clusterNetworkCIDR)
otx.AddFlow("table=5, priority=0, ip, actions=output:2")
otx.AddFlow("table=5, priority=0, ip, actions=goto_table:9")
otx.AddFlow("table=5, priority=0, arp, actions=drop")

// Table 6: ARP to container, filled in by openshift-sdn-ovs
Expand All @@ -272,6 +281,20 @@ func (plugin *OsdnNode) SetupSDN(localSubnetCIDR, clusterNetworkCIDR, servicesNe
// eg, "table=8, priority=100, ip, nw_dst=${remote_subnet_cidr}, actions=move:NXM_NX_REG0[]->NXM_NX_TUN_ID[0..31], set_field:${remote_node_ip}->tun_dst,output:1"
otx.AddFlow("table=8, priority=0, actions=drop")

// Table 9: egress firewall dispatch; edited by UpdateEgressFirewall()
// eg, "table=9, priority=0, actions=goto_table:10
otx.AddFlow("table=9, priority=0, actions=output:2")

// Table 10: egress firewall #1 (FW_TABLE_LOW); filled in by UpdateEgressFirewall()
// eg, "table=10, priority=10001, ip, reg0=${tenant_id}, nw_dst=${foreign_subnet_cidr}, actions=output:2"
// eg, "table=10, priority=10000, ip, nw_dst=${foreign_subnet_cidr}, actions=drop"
otx.AddFlow("table=10, priority=0, actions=output:2")

// Table 11: egress firewall #2 (FW_TABLE_HIGH); filled in by UpdateEgressFirewall()
// eg, "table=11, priority=10001, ip, reg0=${tenant_id}, nw_dst=${foreign_subnet_cidr}, actions=output:2"
// eg, "table=11, priority=10000, ip, nw_dst=${foreign_subnet_cidr}, actions=drop"
otx.AddFlow("table=11, priority=0, actions=output:2")

err = otx.EndTransaction()
if err != nil {
return false, err
Expand Down Expand Up @@ -330,6 +353,139 @@ func (plugin *OsdnNode) SetupSDN(localSubnetCIDR, clusterNetworkCIDR, servicesNe
return true, nil
}

func (plugin *OsdnNode) SetupEgressFirewall() error {
plugin.curFwTable = -1
fw, err := plugin.registry.GetEgressFirewall()
if err == nil {
plugin.fwRules = fw.Rules
} else {
if kerrors.IsNotFound(err) {
plugin.fwRules = nil
} else {
return fmt.Errorf("Could not get EgressFirewall: %s", err)
}
}

err = plugin.UpdateEgressFirewall()
if err != nil {
return err
}
go utilwait.Forever(plugin.watchEgressFirewall, 0)
return nil
}

func (plugin *OsdnNode) watchEgressFirewall() {
eventQueue := plugin.registry.RunEventQueue(EgressFirewalls)

for {
eventType, obj, err := eventQueue.Pop()
if err != nil {
utilruntime.HandleError(fmt.Errorf("EventQueue failed for EgressFirewall: %v", err))
return
}
fw := obj.(*osapi.EgressFirewall)
if fw.Name != "default" {
glog.Warningf("Ignoring invalid EgressFirewall %q", fw.Name)
}

if eventType == watch.Added || eventType == watch.Modified {
if len(fw.Rules) > 10000 {
glog.Warningf("Too many EgressFirewall rules (%d). Ignoring", len(fw.Rules))
}
plugin.fwRules = fw.Rules
} else {
plugin.fwRules = nil
}
err = plugin.UpdateEgressFirewall()
if err != nil {
utilruntime.HandleError(err)
return
}
}
}

func (plugin *OsdnNode) UpdateEgressFirewall() error {
var newFwTable int

otx := ovs.NewTransaction(kexec.New(), BR)

if len(plugin.fwRules) == 0 {
// No rules: update table 9 to just accept traffic immediately
newFwTable = -1
otx.ModFlows("table=9, priority=0, actions=output:2")
} else {
// Figure out which of table 10 and 11 we should use, clean it out,
// fill in the rules, and then update table 9 to point to that table.
// This means that if anything fails, we continue using the old
// firewall rules, and if everything is OK, then we transition from
// the old rules to the new one atomically when we change the table 9
// rule.

if plugin.curFwTable == FW_TABLE_LOW {
newFwTable = FW_TABLE_HIGH
} else {
newFwTable = FW_TABLE_LOW
}
otx.DeleteFlows("table=%d", newFwTable)

for i, rule := range plugin.fwRules {
priority := len(plugin.fwRules) - i

var action string
if rule.Policy == osapi.EgressFirewallPolicyAllow {
action = "output:2"
} else {
action = "drop"
}

var src string
if rule.From == "" {
src = ""
} else {
if !plugin.multitenant {
glog.Errorf("Cannot implement namespaced EgressFirewallRule (%s from %q to %s) with single-tenant plugin", string(rule.Policy), rule.From, rule.To)
continue
}
vnid, err := plugin.getVNID(rule.From)
if err != nil {
glog.Warningf("Could not find namespace %q for EgressFirewallRule: %s from %q to %s: %s", rule.From, string(rule.Policy), rule.From, rule.To, err)
continue
}
src = fmt.Sprintf(", reg0=%s", vnid)
}

var dst string
if rule.To == "0.0.0.0/32" {
dst = ""
} else {
dst = fmt.Sprintf(", nw_dst=%s", rule.To)
}

otx.AddFlow("table=%d, priority=%d, ip%s%s, actions=%s", newFwTable, priority, src, dst, action)
}
otx.AddFlow("table=%d, priority=0, actions=output:2", newFwTable)

otx.ModFlows("table=9, priority=0, actions=goto_table:%d", newFwTable)
}

err := otx.EndTransaction()
if err != nil {
return fmt.Errorf("Error updating OVS flows for EgressFirewall: %v", err)
}

if plugin.curFwTable != -1 {
otx := ovs.NewTransaction(kexec.New(), BR)
otx.DeleteFlows("table=%d", plugin.curFwTable)
err := otx.EndTransaction()
if err != nil {
glog.Warningf("Error cleaning up old OVS flows for EgressFirewall: %v", err)
// Not fatal since we'll try again before we actually use this table again
}
}
plugin.curFwTable = newFwTable
return nil
}

func (plugin *OsdnNode) AddHostSubnetRules(subnet *osapi.HostSubnet) error {
glog.Infof("AddHostSubnetRules for %s", hostSubnetToString(subnet))
otx := ovs.NewTransaction(kexec.New(), BR)
Expand Down
7 changes: 7 additions & 0 deletions pkg/sdn/plugin/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type OsdnNode struct {
vnids *vnidMap
iptablesSyncPeriod time.Duration
mtu uint
curFwTable int
fwRules []osapi.EgressFirewallRule
}

// Called by higher layers to create the plugin SDN node instance
Expand Down Expand Up @@ -71,6 +73,7 @@ func NewNodePlugin(pluginName string, osClient *osclient.Client, kClient *kclien
podNetworkReady: make(chan struct{}),
iptablesSyncPeriod: iptablesSyncPeriod,
mtu: mtu,
curFwTable: -1,
}
return plugin, nil
}
Expand Down Expand Up @@ -98,6 +101,10 @@ func (node *OsdnNode) Start() error {
}
}

if err := node.SetupEgressFirewall(); err != nil {
return err
}

if networkChanged {
var pods []kapi.Pod
pods, err = node.GetLocalPods(kapi.NamespaceAll)
Expand Down
77 changes: 75 additions & 2 deletions pkg/sdn/plugin/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import (
"github.com/golang/glog"

osclient "github.com/openshift/origin/pkg/client"
osapi "github.com/openshift/origin/pkg/sdn/api"
"github.com/openshift/origin/pkg/sdn/plugin/api"

kapi "k8s.io/kubernetes/pkg/api"
kerrors "k8s.io/kubernetes/pkg/api/errors"
kclient "k8s.io/kubernetes/pkg/client/unversioned"
pconfig "k8s.io/kubernetes/pkg/proxy/config"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
Expand All @@ -23,6 +25,8 @@ type ovsProxyPlugin struct {
registry *Registry
podsByIP map[string]*kapi.Pod
podsMutex sync.Mutex
fwRules []osapi.EgressFirewallRule
fwNets []*net.IPNet

baseEndpointsHandler pconfig.EndpointsConfigHandler
}
Expand Down Expand Up @@ -50,15 +54,68 @@ func (proxy *ovsProxyPlugin) Start(baseHandler pconfig.EndpointsConfigHandler) e
return err
}

fw, err := proxy.registry.GetEgressFirewall()
if err == nil {
proxy.setFwRules(fw.Rules)
} else {
if kerrors.IsNotFound(err) {
proxy.setFwRules(nil)
} else {
return fmt.Errorf("Could not get EgressFirewall: %s", err)
}
}

for _, pod := range pods {
proxy.trackPod(&pod)
}

go utilwait.Forever(proxy.watchPods, 0)
go utilwait.Forever(proxy.watchEgressFirewall, 0)

return nil
}

func (proxy *ovsProxyPlugin) watchEgressFirewall() {
eventQueue := proxy.registry.RunEventQueue(EgressFirewalls)

for {
eventType, obj, err := eventQueue.Pop()
if err != nil {
utilruntime.HandleError(fmt.Errorf("EventQueue failed for EgressFirewall: %v", err))
return
}
fw := obj.(*osapi.EgressFirewall)
if fw.Name != "default" {
glog.Warningf("Ignoring invalid EgressFirewall %q", fw.Name)
}

if eventType == watch.Added || eventType == watch.Modified {
if len(fw.Rules) > 10000 {
glog.Warningf("Too many EgressFirewall rules (%d). Ignoring", len(fw.Rules))
}
proxy.setFwRules(fw.Rules)
} else {
proxy.setFwRules(nil)
}
// FIXME: poke the endpoint-syncer somehow...
}
}

func (proxy *ovsProxyPlugin) setFwRules(fwRules []osapi.EgressFirewallRule) {
nets := make([]*net.IPNet, len(fwRules))
for i := range fwRules {
_, cidr, err := net.ParseCIDR(fwRules[i].To)
if err != nil {
// should have been caught by validation
glog.Errorf("Illegal CIDR value %q in EgressFirewall rule", fwRules[i].To)
return
}
nets[i] = cidr
}
proxy.fwRules = fwRules
proxy.fwNets = nets
}

func (proxy *ovsProxyPlugin) watchPods() {
eventQueue := proxy.registry.RunEventQueue(Pods)

Expand Down Expand Up @@ -127,6 +184,18 @@ func (proxy *ovsProxyPlugin) unTrackPod(pod *kapi.Pod) {
}
}

func (proxy *ovsProxyPlugin) firewallBlocksIP(namespace string, ip net.IP) bool {
for i, rule := range proxy.fwRules {
if rule.From != "" && rule.From != namespace {
continue
}
if proxy.fwNets[i].Contains(ip) {
return rule.Policy == osapi.EgressFirewallPolicyDeny
}
}
return false
}

func (proxy *ovsProxyPlugin) OnEndpointsUpdate(allEndpoints []kapi.Endpoints) {
ni, err := proxy.registry.GetNetworkInfo()
if err != nil {
Expand All @@ -145,8 +214,7 @@ EndpointLoop:
if ni.ServiceNetwork.Contains(IP) {
glog.Warningf("Service '%s' in namespace '%s' has an Endpoint inside the service network (%s)", ep.ObjectMeta.Name, ns, addr.IP)
continue EndpointLoop
}
if ni.ClusterNetwork.Contains(IP) {
} else if ni.ClusterNetwork.Contains(IP) {
podInfo, ok := proxy.getTrackedPod(addr.IP)
if !ok {
glog.Warningf("Service '%s' in namespace '%s' has an Endpoint pointing to non-existent pod (%s)", ep.ObjectMeta.Name, ns, addr.IP)
Expand All @@ -156,6 +224,11 @@ EndpointLoop:
glog.Warningf("Service '%s' in namespace '%s' has an Endpoint pointing to pod %s in namespace '%s'", ep.ObjectMeta.Name, ns, addr.IP, podInfo.ObjectMeta.Namespace)
continue EndpointLoop
}
} else {
if proxy.firewallBlocksIP(ns, IP) {
glog.Warningf("Service '%s' in namespace '%s' has an Endpoint pointing to firewalled destination (%s)", ep.ObjectMeta.Name, ns, addr.IP)
continue EndpointLoop
}
}
}
}
Expand Down
Loading

0 comments on commit 8a49b98

Please sign in to comment.