Skip to content

Commit

Permalink
add configmap based config
Browse files Browse the repository at this point in the history
  • Loading branch information
cadmuxe committed Nov 12, 2019
1 parent 5d2e3fa commit 6af37aa
Show file tree
Hide file tree
Showing 19 changed files with 575 additions and 107 deletions.
43 changes: 26 additions & 17 deletions cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"k8s.io/klog"

crdclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
Expand Down Expand Up @@ -79,13 +78,7 @@ func main() {
if err != nil {
klog.Fatalf("Failed to create kubernetes client: %v", err)
}
var dynamicClient dynamic.Interface
if flags.F.EnableCSM {
dynamicClient, err = dynamic.NewForConfig(kubeConfig)
if err != nil {
klog.Fatalf("Failed to create kubernetes dynamic client: %v", err)
}
}

// Due to scaling issues, leader election must be configured with a separate k8s client.
leaderElectKubeClient, err := kubernetes.NewForConfig(restclient.AddUserAgent(kubeConfig, "leader-election"))
if err != nil {
Expand Down Expand Up @@ -139,28 +132,29 @@ func main() {
HealthCheckPath: flags.F.HealthCheckPath,
DefaultBackendHealthCheckPath: flags.F.DefaultSvcHealthCheckPath,
FrontendConfigEnabled: flags.F.EnableFrontendConfig,
EnableCSM: flags.F.EnableCSM,
EnableASMConfigMap: flags.F.EnableASMConfigMapBasedConfig,
ASMConfigMapNamespace: flags.F.ASMConfigMapBasedConfigNamespace,
ASMConfigMapName: flags.F.ASMConfigMapBasedConfigCMName,
}
ctx := ingctx.NewControllerContext(kubeClient, dynamicClient, backendConfigClient, frontendConfigClient, cloud, namer, ctxConfig)
ctx := ingctx.NewControllerContext(kubeConfig, kubeClient, backendConfigClient, frontendConfigClient, cloud, namer, ctxConfig)
go app.RunHTTPServer(ctx.HealthCheck)

if !flags.F.LeaderElection.LeaderElect {
runControllers(ctx)
return
}

electionConfig, err := makeLeaderElectionConfig(leaderElectKubeClient, ctx.Recorder(flags.F.LeaderElection.LockObjectNamespace), func() {
runControllers(ctx)
})
electionConfig, err := makeLeaderElectionConfig(ctx, leaderElectKubeClient, ctx.Recorder(flags.F.LeaderElection.LockObjectNamespace))
if err != nil {
klog.Fatalf("%v", err)
}
leaderelection.RunOrDie(context.Background(), *electionConfig)
klog.Warning("Ingress Controller exited.")
}

// makeLeaderElectionConfig builds a leader election configuration. It will
// create a new resource lock associated with the configuration.
func makeLeaderElectionConfig(client clientset.Interface, recorder record.EventRecorder, run func()) (*leaderelection.LeaderElectionConfig, error) {
func makeLeaderElectionConfig(ctx *ingctx.ControllerContext, client clientset.Interface, recorder record.EventRecorder) (*leaderelection.LeaderElectionConfig, error) {
hostname, err := os.Hostname()
if err != nil {
return nil, fmt.Errorf("unable to get hostname: %v", err)
Expand All @@ -180,6 +174,12 @@ func makeLeaderElectionConfig(client clientset.Interface, recorder record.EventR
return nil, fmt.Errorf("couldn't create resource lock: %v", err)
}

run := func() {
runControllers(ctx)
klog.Info("Shutting down leader election")
os.Exit(0)
}

return &leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: flags.F.LeaderElection.LeaseDuration.Duration,
Expand All @@ -192,20 +192,26 @@ func makeLeaderElectionConfig(client clientset.Interface, recorder record.EventR
run()
},
OnStoppedLeading: func() {
klog.Fatalf("lost master")
klog.Warning("lost master")
},
},
}, nil
}

func runControllers(ctx *ingctx.ControllerContext) {
stopCh := make(chan struct{})
ctx.Init()
lbc := controller.NewLoadBalancerController(ctx, stopCh)
if ctx.EnableASMConfigMap {
ctx.ASMConfigController.RegisterInformer(ctx.ConfigMapInformer, func() {
lbc.Stop(false) // We want to trigger a restart, don't have to clean up all the resources.
})
}

fwc := firewalls.NewFirewallController(ctx, flags.F.NodePortRanges.Values())

// TODO: Refactor NEG to use cloud mocks so ctx.Cloud can be referenced within NewController.
negController := neg.NewController(negtypes.NewAdapter(ctx.Cloud), ctx, lbc.Translator, ctx.ClusterNamer, flags.F.ResyncPeriod, flags.F.NegGCPeriod, flags.F.EnableReadinessReflector, flags.F.EnableCSM, flags.F.CSMServiceNEGSkipNamespaces)
negController := neg.NewController(negtypes.NewAdapter(ctx.Cloud), ctx, lbc.Translator, ctx.ClusterNamer, flags.F.ResyncPeriod, flags.F.NegGCPeriod, flags.F.EnableReadinessReflector)

go negController.Run(stopCh)
klog.V(0).Infof("negController started")
Expand All @@ -220,7 +226,10 @@ func runControllers(ctx *ingctx.ControllerContext) {
lbc.Run()

for {
klog.Infof("Handled quit, awaiting pod deletion.")
klog.Warning("Handled quit, awaiting pod deletion.")
time.Sleep(30 * time.Second)
if ctx.EnableASMConfigMap {
return
}
}
}
8 changes: 8 additions & 0 deletions docs/deploy/resources/configmap-based-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: ingress-controller-asm-cm-config
namespace: kube-system
data:
enable-asm: "false"
asm-skip-namespaces: "kube-system,istio-system"
54 changes: 54 additions & 0 deletions pkg/cmconfig/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package cmconfig

import (
"fmt"
"reflect"
"strings"

utilerrors "k8s.io/apimachinery/pkg/util/errors"
)

// Config holds configmap based configurations.
type Config struct {
EnableASM bool
ASMServiceNEGSkipNamespaces []string
}

const (
trueValue = "true"
falseValue = "false"

enableASM = "enable-asm"
asmSkipNamespaces = "asm-skip-namespaces"
)

// NewConfig returns a Conifg instances with default values.
func NewConfig() Config {
return Config{ASMServiceNEGSkipNamespaces: []string{"kube-system"}}
}

// Equals returns true if c equals to other.
func (c *Config) Equals(other *Config) bool {
return reflect.DeepEqual(c, other)
}

// LoadValue loads configs from a map, it will ignore any unknow/unvalid field.
func (c *Config) LoadValue(m map[string]string) error {
var errList []error
for k, v := range m {
if k == enableASM {
if v == trueValue {
c.EnableASM = true
} else if v == falseValue {
c.EnableASM = false
} else {
errList = append(errList, fmt.Errorf("The map provided a unvalid value for field: %s, value: %s, valid values are: %s/%s", k, v, trueValue, falseValue))
}
} else if k == asmSkipNamespaces {
c.ASMServiceNEGSkipNamespaces = strings.Split(v, ",")
} else {
errList = append(errList, fmt.Errorf("The map contains a unknown key-value pair: %s:%s", k, v))
}
}
return utilerrors.NewAggregate(errList)
}
72 changes: 72 additions & 0 deletions pkg/cmconfig/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package cmconfig

import (
"reflect"
"strings"
"testing"
)

func TestLoadValue(t *testing.T) {
testcases := []struct {
desc string
inputMap map[string]string
wantConfig Config
wantLog string
}{
{
desc: "empty map should give default config",
inputMap: map[string]string{},
wantConfig: NewConfig(),
wantLog: "",
},
{
desc: "LoadValue should load values from a valid map",
inputMap: map[string]string{"enable-asm": "true", "asm-skip-namespaces": "name-space1,namespace2"},
wantConfig: Config{EnableASM: true, ASMServiceNEGSkipNamespaces: []string{"name-space1", "namespace2"}},
wantLog: "",
},
{
desc: "LoadValue should return the default value if EnableASM has a unvalid value.",
inputMap: map[string]string{"enable-asm": "f"},
wantConfig: Config{EnableASM: false, ASMServiceNEGSkipNamespaces: []string{"kube-system"}},
wantLog: "The map provided a unvalid value for field: enable-asm, value: f, valid values are: true/false",
},
{
desc: "LoadValue should be tolerant for unknow field.",
inputMap: map[string]string{"A": "B"},
wantConfig: NewConfig(),
wantLog: "The map contains a unknown key-value pair: A:B",
},
}

for _, tc := range testcases {
t.Run(tc.desc, func(t *testing.T) {
config := NewConfig()
err := config.LoadValue(tc.inputMap)
if !config.Equals(&tc.wantConfig) {
t.Errorf("LoadValue loads wrong value, got: %v, want: %v", config, tc.wantConfig)
}
if tc.wantLog != "" {
if !strings.Contains(err.Error(), tc.wantLog) {
t.Errorf("LoadValue logs don't contain wanted log, got: %s, want: %s", err.Error(), tc.wantLog)
}
}
})
}
}

func TestConfigTag(t *testing.T) {
configType := reflect.TypeOf(Config{})
for i := 0; i < configType.NumField(); i++ {
field := configType.Field(i)
fieldType := field.Type.Kind()
if fieldType != reflect.Bool && fieldType != reflect.String && fieldType != reflect.Slice {
t.Errorf("Struct config contains filed with unknown type: %s, only supports: %s/%s/[]string types", fieldType, reflect.Bool.String(), reflect.String.String())
}
if fieldType == reflect.Slice {
if field.Type.Elem().Kind() != reflect.String {
t.Errorf("Struct config contains slice filed with unknown type: %s, only supports []string slice", field.Type.Elem().Kind())
}
}
}
}
117 changes: 117 additions & 0 deletions pkg/cmconfig/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package cmconfig

import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/klog"
)

// ConfigMapConfigController is the ConfigMap based config controller.
// If cmConfigModeEnabled set to true, it will load the config from configmap: configMapNamespace/configMapName and restart ingress controller if the config has any ligeal changes.
// If cmConfigModeEnabled set to false, it will return the default values for the configs.
type ConfigMapConfigController struct {
configMapNamespace string
configMapName string
currentConfig *Config
currentConfigMapObject *v1.ConfigMap
kubeClient kubernetes.Interface
recorder record.EventRecorder
}

// NewConfigMapConfigController creates a new ConfigMapConfigController, it will load the config from the target configmap
func NewConfigMapConfigController(kubeClient kubernetes.Interface, recorder record.EventRecorder, configMapNamespace, configMapName string) *ConfigMapConfigController {

currentConfig := NewConfig()
cm, err := kubeClient.CoreV1().ConfigMaps(configMapNamespace).Get(configMapName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
klog.Infof("ConfigMapConfigController: Not found the configmap based config, using default config: %v", currentConfig)
} else {
klog.Warningf("ConfigMapConfigController failed to load config from api server, using the defualt config. Error: %v", err)
}
} else {
if err := currentConfig.LoadValue(cm.Data); err != nil {
if recorder != nil {
recorder.Event(cm, "Warning", "LoadValueError", err.Error())
}
klog.Warningf("LoadValue error: %s", err.Error())
}
klog.Infof("ConfigMapConfigController: loaded config from configmap, config %v", currentConfig)
}

c := &ConfigMapConfigController{
configMapNamespace: configMapNamespace,
configMapName: configMapName,
currentConfig: &currentConfig,
kubeClient: kubeClient,
recorder: recorder,
}
return c
}

// GetConfig returns the internal Config
func (c *ConfigMapConfigController) GetConfig() Config {
return *c.currentConfig
}

// RecordEvent records a event to the ASMConfigmap
func (c *ConfigMapConfigController) RecordEvent(eventtype, reason, message string) bool {
if c.recorder == nil || c.currentConfigMapObject == nil {
return false
}
c.recorder.Event(c.currentConfigMapObject, eventtype, reason, message)
return true
}

// RegisterInformer regjister the configmap based config controller handler to the configapInformer which will watch the target
// configmap and send stop message to the stopCh if any valid change detected.
func (c *ConfigMapConfigController) RegisterInformer(configMapInformer cache.SharedIndexInformer, cancel func()) {
configMapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
c.processItem(obj, cancel)
},
DeleteFunc: func(obj interface{}) {
c.processItem(obj, cancel)
},
UpdateFunc: func(_, cur interface{}) {
c.processItem(cur, cancel)
},
})

}

func (c *ConfigMapConfigController) processItem(obj interface{}, cancel func()) {
configMap, ok := obj.(*v1.ConfigMap)
if !ok {
klog.Errorf("ConfigMapConfigController: failed to convert informer object to ConfigMap.")
}
if configMap.Namespace != c.configMapNamespace || configMap.Name != c.configMapName {
return
}

config := NewConfig()
cm, err := c.kubeClient.CoreV1().ConfigMaps(c.configMapNamespace).Get(c.configMapName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
klog.Infof("ConfigMapConfigController: Not found the configmap based config, using default config: %v", config)
} else {
klog.Warningf("ConfigMapConfigController failed to load config from api server, using the defualt config. Error: %v", err)
}
} else {
c.currentConfigMapObject = cm
if err := config.LoadValue(cm.Data); err != nil {
c.RecordEvent("Warning", "LoadValueError", err.Error())
klog.Warningf("LoadValue error: %s", err.Error())
}
}

if !config.Equals(c.currentConfig) {
klog.Warningf("ConfigMapConfigController: Get a update on the ConfigMapConfig. Old config: %v, new config: %v. Restarting Ingress controller...", *c.currentConfig, config)
c.RecordEvent("Normal", "ASMConfigMapTiggerRestart", "ConfigMapConfigController: Get a update on the ConfigMapConfig, Restarting Ingress controller")
cancel()
}
}
Loading

0 comments on commit 6af37aa

Please sign in to comment.