Skip to content
This repository has been archived by the owner on Jan 12, 2023. It is now read-only.

Commit

Permalink
Add leader lock to evicter
Browse files Browse the repository at this point in the history
  • Loading branch information
alpe committed Feb 3, 2020
1 parent 1dd19b1 commit 00bb537
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 54 deletions.
8 changes: 4 additions & 4 deletions cmd/evicter/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Evict tainted pods after period
Operator that runs within k8s to find and evict tainted pods
Operator that runs within k8s to find and evict `tainted` pods.

* `k-rail/tainted-timestamp`
* `k-rail/tainted-prevent-eviction`
* `k-rail/reason`
* `k-rail/tainted-timestamp` store the unix timestamp when the root event happend
* `k-rail/tainted-prevent-eviction` is a break-glass annotation to prevent automated eviction
* `k-rail/reason` intended for humans
20 changes: 12 additions & 8 deletions cmd/evicter/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

type podProvisioner interface {
Evict(pod *v1.Pod, reason string) error
Evict(pod *v1.Pod, reason, msg string) error
}

type Controller struct {
Expand Down Expand Up @@ -50,11 +50,16 @@ func (c *Controller) processNextItem() bool {
}

const (
// annotationPreventEviction is a break-glass annotation to prevent automated eviction
annotationPreventEviction = "k-rail/tainted-prevent-eviction"
// annotationTimestamp stores the unix timestamp when the root event happened
annotationTimestamp = "k-rail/tainted-timestamp"
// annotationReason is used to define any additional reason in a human readable form
annotationReason = "k-rail/tainted-reason"
)
const defaultEvictionReason = "exec"

const defaultEvictionReason = "Tainted"
const noEvictionNote = "Pod was marked as tainted"

// evictPod is the business logic of the controller. it checks the the eviction rules and conditions before calling the pod provisioner.
func (c *Controller) evictPod(key string) error {
Expand All @@ -73,12 +78,11 @@ func (c *Controller) evictPod(key string) error {
return nil
}

reason, ok := pod.Annotations[annotationReason]
if !ok || reason == "" {
reason = defaultEvictionReason
msg, ok := pod.Annotations[annotationReason]
if !ok || msg == "" {
msg = noEvictionNote
}

return c.podProvisioner.Evict(pod, reason)
return c.podProvisioner.Evict(pod, defaultEvictionReason, msg)
}

func canEvict(pod *v1.Pod, incubationPeriod time.Duration) bool {
Expand Down Expand Up @@ -138,7 +142,7 @@ func (c *Controller) handleErr(err error, key interface{}) {
const reconciliationTick = 30 * time.Second
const startupGracePeriod = 90 * time.Second

func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) {
defer runtime.HandleCrash()

// Let the workers stop when we are done
Expand Down
147 changes: 105 additions & 42 deletions cmd/evicter/main.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,44 @@
package main

import (
"context"
"flag"
"os"
"os/signal"
"syscall"
"time"

"github.com/cruise-automation/k-rail/resource"
"github.com/golang/glog"
"github.com/google/uuid"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/kubernetes/typed/policy/v1beta1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
)


func main() {
var (
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file: `<home>/.kube/config`")
master = flag.String("master", "", "master url")
labelSelector = flag.String("label-selector", "tainted=true", "label selector to discover tainted pods")
terminationGracePeriodSeconds = flag.Int64("termination-grace-period", 30, "pod termination grace period in seconds")
taintedIncubationPeriodSeconds = flag.Int64("incubation-period", 24*60*60, "time in seconds a tainted pod can run before eviction")
instanceID = flag.String("instance", uuid.New().String(), "the unique holder identity. used for leader lock")
leaseLockName = flag.String("lease-lock-name", "k-rail-evicter-lock", "the lease lock resource name")
leaseLockNamespace = flag.String("lease-lock-namespace", "k-rail", "the lease lock resource namespace")
)
flag.Parse()
flag.Set("logtostderr", "true") // glog: no disk log
Expand All @@ -37,72 +52,120 @@ func main() {
if err != nil {
klog.Fatal(err)
}
podListWatcher := cache.NewFilteredListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", metav1.NamespaceDefault,
func(options *metav1.ListOptions) {
options.LabelSelector = *labelSelector
})

queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
// Bind the workqueue to a cache with the help of an informer. This way we make sure that
// whenever the cache is updated, the pod key is added to the workqueue.
// Note that when we finally process the item from the workqueue, we might see a newer version
// of the Pod than the version which was responsible for triggering the update.
indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if key, err := cache.MetaNamespaceKeyFunc(obj); err == nil {
queue.Add(key)
}
},
UpdateFunc: func(old interface{}, new interface{}) {
if key, err := cache.MetaNamespaceKeyFunc(new); err == nil {
queue.Add(key)
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
watchSigTerm(cancel)

br := record.NewBroadcaster()
br.StartRecordingToSink(&corev1.EventSinkImpl{Interface: clientset.CoreV1().Events(metav1.NamespaceAll)})
br.StartLogging(glog.Infof)
defer br.Shutdown()
eventRecorder := br.NewRecorder(scheme.Scheme, v1.EventSource{Component: "k-rail-evicter"})

lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: *leaseLockName,
Namespace: *leaseLockNamespace,
},
DeleteFunc: func(obj interface{}) {
// IndexerInformer uses a delta queue, therefore for deletes we have to use this key function.
if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj); err == nil {
queue.Add(key)
}
Client: clientset.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: *instanceID,
EventRecorder: eventRecorder,
},
}, cache.Indexers{})
}

stop := make(chan struct{})
defer close(stop)
// start the leader election code loop
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
ReleaseOnCancel: true,
LeaseDuration: 60 * time.Second,
RenewDeadline: 15 * time.Second,
RetryPeriod: 5 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
klog.Infof("Starting leader: %s", *instanceID)
podListWatcher := cache.NewFilteredListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", metav1.NamespaceAll,
func(options *metav1.ListOptions) {
options.LabelSelector = *labelSelector
})

eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: clientset.EventsV1beta1().Events("")})
eventBroadcaster.StartRecordingToSink(stop)
defer eventBroadcaster.Shutdown()
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

evicter := newPodEvicter(clientset.PolicyV1beta1(), eventBroadcaster.NewRecorder(scheme.Scheme, "k-rail-evicter"), *terminationGracePeriodSeconds)
controller := NewController(queue, indexer, informer, evicter, *taintedIncubationPeriodSeconds)
// Bind the workqueue to a cache with the help of an informer. This way we make sure that
// whenever the cache is updated, the pod key is added to the workqueue.
// Note that when we finally process the item from the workqueue, we might see a newer version
// of the Pod than the version which was responsible for triggering the update.
indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if key, err := cache.MetaNamespaceKeyFunc(obj); err == nil {
queue.Add(key)
}
},
UpdateFunc: func(old interface{}, new interface{}) {
if key, err := cache.MetaNamespaceKeyFunc(new); err == nil {
queue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
// IndexerInformer uses a delta queue, therefore for deletes we have to use this key function.
if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj); err == nil {
queue.Add(key)
}
},
}, cache.Indexers{})

go controller.Run(1, stop)
evicter := newPodEvicter(clientset.PolicyV1beta1(), eventRecorder, *terminationGracePeriodSeconds)
controller := NewController(queue, indexer, informer, evicter, *taintedIncubationPeriodSeconds)
controller.Run(1, ctx.Done())
},
OnStoppedLeading: func() {
// we can do cleanup here
klog.Infof("Leader lost: %s", *instanceID)
os.Exit(0)
},
OnNewLeader: func(identity string) {
if identity == *instanceID {
return
}
klog.Infof("New leader elected: %s", identity)
},
},
})
}

// todo: watch sigterm
// todo: recover panic to log
select {}
func watchSigTerm(cancel context.CancelFunc) {
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
go func() {
<-ch
klog.Info("Received termination, signaling shutdown")
cancel()
}()
}

type podEvicter struct {
client v1beta1.PolicyV1beta1Interface
eventRecorder events.EventRecorder
eventRecorder record.EventRecorder
defaultDeleteOptions *metav1.DeleteOptions
}

func newPodEvicter(client v1beta1.PolicyV1beta1Interface, recorder events.EventRecorder, gracePeriodSeconds int64) *podEvicter {
func newPodEvicter(client v1beta1.PolicyV1beta1Interface, recorder record.EventRecorder, gracePeriodSeconds int64) *podEvicter {
return &podEvicter{
client: client,
eventRecorder: recorder,
defaultDeleteOptions: &metav1.DeleteOptions{GracePeriodSeconds: &gracePeriodSeconds},
}
}

func (p *podEvicter) Evict(pod *v1.Pod, reason string) error {
// Evict calls the k8s api to evict the given pod. Reason and notes are stored with the audit event.
func (p *podEvicter) Evict(pod *v1.Pod, reason, msg string) error {
err := p.client.Evictions(pod.Namespace).Evict(newEviction(pod, p.defaultDeleteOptions))
klog.Infof("Evicted pod %q (UID: %s)", resource.GetResourceName(pod.ObjectMeta), pod.UID)
if err != nil {
return errors.Wrap(err, "eviction")
}
p.eventRecorder.Eventf(pod, nil, v1.EventTypeNormal, reason, "Eviction", "")
p.eventRecorder.Eventf(pod, v1.EventTypeNormal, reason, msg)
return nil
}

Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ go 1.12

require (
github.com/gobwas/glob v0.2.3
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/google/uuid v1.1.1
github.com/googleapis/gnostic v0.3.1 // indirect
github.com/gorilla/mux v1.7.0
github.com/imdario/mergo v0.3.8 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZ
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/evanphx/json-patch v4.2.0+incompatible h1:fUDGZCv/7iAN7u0puUVhvKCcsR6vRfwrJatElLBEf0I=
github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
Expand All @@ -35,6 +36,7 @@ github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y=
github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8=
github.com/gogo/protobuf v1.2.2-0.20190723190241-65acae22fc9d h1:3PaI8p3seN09VjbTYC/QWlUZdZ1qS1zGjy7LH2Wt07I=
github.com/gogo/protobuf v1.2.2-0.20190723190241-65acae22fc9d/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903 h1:LbsanbbD6LieFkXbj9YNNBupiGHJgFeLpO0j0Fza1h8=
github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
Expand Down

0 comments on commit 00bb537

Please sign in to comment.