diff --git a/cmd/evicter/README.md b/cmd/evicter/README.md index bdedf24..30d6a76 100644 --- a/cmd/evicter/README.md +++ b/cmd/evicter/README.md @@ -1,6 +1,6 @@ # Evict tainted pods after period -Operator that runs within k8s to find and evict tainted pods +Operator that runs within k8s to find and evict `tainted` pods. -* `k-rail/tainted-timestamp` -* `k-rail/tainted-prevent-eviction` -* `k-rail/reason` +* `k-rail/tainted-timestamp` store the unix timestamp when the root event happend +* `k-rail/tainted-prevent-eviction` is a break-glass annotation to prevent automated eviction +* `k-rail/reason` intended for humans diff --git a/cmd/evicter/controller.go b/cmd/evicter/controller.go index 397bff6..e44334a 100644 --- a/cmd/evicter/controller.go +++ b/cmd/evicter/controller.go @@ -15,7 +15,7 @@ import ( ) type podProvisioner interface { - Evict(pod *v1.Pod, reason string) error + Evict(pod *v1.Pod, reason, msg string) error } type Controller struct { @@ -50,11 +50,16 @@ func (c *Controller) processNextItem() bool { } const ( + // annotationPreventEviction is a break-glass annotation to prevent automated eviction annotationPreventEviction = "k-rail/tainted-prevent-eviction" + // annotationTimestamp stores the unix timestamp when the root event happened annotationTimestamp = "k-rail/tainted-timestamp" + // annotationReason is used to define any additional reason in a human readable form annotationReason = "k-rail/tainted-reason" ) -const defaultEvictionReason = "exec" + +const defaultEvictionReason = "Tainted" +const noEvictionNote = "Pod was marked as tainted" // evictPod is the business logic of the controller. it checks the the eviction rules and conditions before calling the pod provisioner. func (c *Controller) evictPod(key string) error { @@ -73,12 +78,11 @@ func (c *Controller) evictPod(key string) error { return nil } - reason, ok := pod.Annotations[annotationReason] - if !ok || reason == "" { - reason = defaultEvictionReason + msg, ok := pod.Annotations[annotationReason] + if !ok || msg == "" { + msg = noEvictionNote } - - return c.podProvisioner.Evict(pod, reason) + return c.podProvisioner.Evict(pod, defaultEvictionReason, msg) } func canEvict(pod *v1.Pod, incubationPeriod time.Duration) bool { @@ -138,7 +142,7 @@ func (c *Controller) handleErr(err error, key interface{}) { const reconciliationTick = 30 * time.Second const startupGracePeriod = 90 * time.Second -func (c *Controller) Run(threadiness int, stopCh chan struct{}) { +func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) { defer runtime.HandleCrash() // Let the workers stop when we are done diff --git a/cmd/evicter/main.go b/cmd/evicter/main.go index 04eaaf9..2f55c02 100644 --- a/cmd/evicter/main.go +++ b/cmd/evicter/main.go @@ -1,22 +1,34 @@ package main import ( + "context" "flag" + "os" + "os/signal" + "syscall" + "time" + "github.com/cruise-automation/k-rail/resource" + "github.com/golang/glog" + "github.com/google/uuid" "github.com/pkg/errors" v1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" + corev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/kubernetes/typed/policy/v1beta1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" - "k8s.io/client-go/tools/events" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/klog" ) + func main() { var ( kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file: `/.kube/config`") @@ -24,6 +36,9 @@ func main() { labelSelector = flag.String("label-selector", "tainted=true", "label selector to discover tainted pods") terminationGracePeriodSeconds = flag.Int64("termination-grace-period", 30, "pod termination grace period in seconds") taintedIncubationPeriodSeconds = flag.Int64("incubation-period", 24*60*60, "time in seconds a tainted pod can run before eviction") + instanceID = flag.String("instance", uuid.New().String(), "the unique holder identity. used for leader lock") + leaseLockName = flag.String("lease-lock-name", "k-rail-evicter-lock", "the lease lock resource name") + leaseLockNamespace = flag.String("lease-lock-namespace", "k-rail", "the lease lock resource namespace") ) flag.Parse() flag.Set("logtostderr", "true") // glog: no disk log @@ -37,59 +52,105 @@ func main() { if err != nil { klog.Fatal(err) } - podListWatcher := cache.NewFilteredListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", metav1.NamespaceDefault, - func(options *metav1.ListOptions) { - options.LabelSelector = *labelSelector - }) - - queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) - // Bind the workqueue to a cache with the help of an informer. This way we make sure that - // whenever the cache is updated, the pod key is added to the workqueue. - // Note that when we finally process the item from the workqueue, we might see a newer version - // of the Pod than the version which was responsible for triggering the update. - indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - if key, err := cache.MetaNamespaceKeyFunc(obj); err == nil { - queue.Add(key) - } - }, - UpdateFunc: func(old interface{}, new interface{}) { - if key, err := cache.MetaNamespaceKeyFunc(new); err == nil { - queue.Add(key) - } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + watchSigTerm(cancel) + + br := record.NewBroadcaster() + br.StartRecordingToSink(&corev1.EventSinkImpl{Interface: clientset.CoreV1().Events(metav1.NamespaceAll)}) + br.StartLogging(glog.Infof) + defer br.Shutdown() + eventRecorder := br.NewRecorder(scheme.Scheme, v1.EventSource{Component: "k-rail-evicter"}) + + lock := &resourcelock.LeaseLock{ + LeaseMeta: metav1.ObjectMeta{ + Name: *leaseLockName, + Namespace: *leaseLockNamespace, }, - DeleteFunc: func(obj interface{}) { - // IndexerInformer uses a delta queue, therefore for deletes we have to use this key function. - if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj); err == nil { - queue.Add(key) - } + Client: clientset.CoordinationV1(), + LockConfig: resourcelock.ResourceLockConfig{ + Identity: *instanceID, + EventRecorder: eventRecorder, }, - }, cache.Indexers{}) + } - stop := make(chan struct{}) - defer close(stop) + // start the leader election code loop + leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ + Lock: lock, + ReleaseOnCancel: true, + LeaseDuration: 60 * time.Second, + RenewDeadline: 15 * time.Second, + RetryPeriod: 5 * time.Second, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + klog.Infof("Starting leader: %s", *instanceID) + podListWatcher := cache.NewFilteredListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, + func(options *metav1.ListOptions) { + options.LabelSelector = *labelSelector + }) - eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: clientset.EventsV1beta1().Events("")}) - eventBroadcaster.StartRecordingToSink(stop) - defer eventBroadcaster.Shutdown() + queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) - evicter := newPodEvicter(clientset.PolicyV1beta1(), eventBroadcaster.NewRecorder(scheme.Scheme, "k-rail-evicter"), *terminationGracePeriodSeconds) - controller := NewController(queue, indexer, informer, evicter, *taintedIncubationPeriodSeconds) + // Bind the workqueue to a cache with the help of an informer. This way we make sure that + // whenever the cache is updated, the pod key is added to the workqueue. + // Note that when we finally process the item from the workqueue, we might see a newer version + // of the Pod than the version which was responsible for triggering the update. + indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + if key, err := cache.MetaNamespaceKeyFunc(obj); err == nil { + queue.Add(key) + } + }, + UpdateFunc: func(old interface{}, new interface{}) { + if key, err := cache.MetaNamespaceKeyFunc(new); err == nil { + queue.Add(key) + } + }, + DeleteFunc: func(obj interface{}) { + // IndexerInformer uses a delta queue, therefore for deletes we have to use this key function. + if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj); err == nil { + queue.Add(key) + } + }, + }, cache.Indexers{}) - go controller.Run(1, stop) + evicter := newPodEvicter(clientset.PolicyV1beta1(), eventRecorder, *terminationGracePeriodSeconds) + controller := NewController(queue, indexer, informer, evicter, *taintedIncubationPeriodSeconds) + controller.Run(1, ctx.Done()) + }, + OnStoppedLeading: func() { + // we can do cleanup here + klog.Infof("Leader lost: %s", *instanceID) + os.Exit(0) + }, + OnNewLeader: func(identity string) { + if identity == *instanceID { + return + } + klog.Infof("New leader elected: %s", identity) + }, + }, + }) +} - // todo: watch sigterm - // todo: recover panic to log - select {} +func watchSigTerm(cancel context.CancelFunc) { + ch := make(chan os.Signal, 1) + signal.Notify(ch, os.Interrupt, syscall.SIGTERM) + go func() { + <-ch + klog.Info("Received termination, signaling shutdown") + cancel() + }() } type podEvicter struct { client v1beta1.PolicyV1beta1Interface - eventRecorder events.EventRecorder + eventRecorder record.EventRecorder defaultDeleteOptions *metav1.DeleteOptions } -func newPodEvicter(client v1beta1.PolicyV1beta1Interface, recorder events.EventRecorder, gracePeriodSeconds int64) *podEvicter { +func newPodEvicter(client v1beta1.PolicyV1beta1Interface, recorder record.EventRecorder, gracePeriodSeconds int64) *podEvicter { return &podEvicter{ client: client, eventRecorder: recorder, @@ -97,12 +158,14 @@ func newPodEvicter(client v1beta1.PolicyV1beta1Interface, recorder events.EventR } } -func (p *podEvicter) Evict(pod *v1.Pod, reason string) error { +// Evict calls the k8s api to evict the given pod. Reason and notes are stored with the audit event. +func (p *podEvicter) Evict(pod *v1.Pod, reason, msg string) error { err := p.client.Evictions(pod.Namespace).Evict(newEviction(pod, p.defaultDeleteOptions)) + klog.Infof("Evicted pod %q (UID: %s)", resource.GetResourceName(pod.ObjectMeta), pod.UID) if err != nil { return errors.Wrap(err, "eviction") } - p.eventRecorder.Eventf(pod, nil, v1.EventTypeNormal, reason, "Eviction", "") + p.eventRecorder.Eventf(pod, v1.EventTypeNormal, reason, msg) return nil } diff --git a/go.mod b/go.mod index bc92e84..ed1d84e 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,8 @@ go 1.12 require ( github.com/gobwas/glob v0.2.3 + github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b + github.com/google/uuid v1.1.1 github.com/googleapis/gnostic v0.3.1 // indirect github.com/gorilla/mux v1.7.0 github.com/imdario/mergo v0.3.8 // indirect diff --git a/go.sum b/go.sum index adaf2c9..22b1332 100644 --- a/go.sum +++ b/go.sum @@ -22,6 +22,7 @@ github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZ github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= +github.com/evanphx/json-patch v4.2.0+incompatible h1:fUDGZCv/7iAN7u0puUVhvKCcsR6vRfwrJatElLBEf0I= github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= @@ -35,6 +36,7 @@ github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y= github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= github.com/gogo/protobuf v1.2.2-0.20190723190241-65acae22fc9d h1:3PaI8p3seN09VjbTYC/QWlUZdZ1qS1zGjy7LH2Wt07I= github.com/gogo/protobuf v1.2.2-0.20190723190241-65acae22fc9d/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903 h1:LbsanbbD6LieFkXbj9YNNBupiGHJgFeLpO0j0Fza1h8= github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=