From b943f36e09c41c0d61054425a05f22e40cea62e2 Mon Sep 17 00:00:00 2001 From: Ahmed ElSayed Date: Thu, 21 Jan 2021 13:24:10 -0800 Subject: [PATCH] Emit kubernetes events from KEDA Signed-off-by: Ahmed ElSayed --- CHANGELOG.md | 1 + adapter/main.go | 6 +++- controllers/scaledjob_controller.go | 8 +++++- controllers/scaledjob_finalizer.go | 3 ++ controllers/scaledobject_controller.go | 14 +++++++--- controllers/scaledobject_finalizer.go | 3 ++ go.sum | 1 + main.go | 14 ++++++---- pkg/eventreason/eventreason.go | 31 +++++++++++++++++++++ pkg/scaling/executor/scale_executor.go | 5 +++- pkg/scaling/executor/scale_jobs.go | 2 ++ pkg/scaling/executor/scale_scaledobjects.go | 10 ++++++- pkg/scaling/scale_handler.go | 12 ++++++-- 13 files changed, 94 insertions(+), 16 deletions(-) create mode 100644 pkg/eventreason/eventreason.go diff --git a/CHANGELOG.md b/CHANGELOG.md index aa00cf7ecd8..8027513a94e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ - Add Redis cluster support for Redis list and Redis streams scalers ([#1437](https://github.com/kedacore/keda/pull/1437)) - Global authentication credentials can be managed using `ClusterTriggerAuthentication` objects ([#1452](https://github.com/kedacore/keda/pull/1452)) - Introducing OpenStack Swift scaler ([#1342](https://github.com/kedacore/keda/issues/1342)) +- Emit Kubernetes Events on KEDA events ([#1523]) ### Improvements - Support add ScaledJob's label to its job ([#1311](https://github.com/kedacore/keda/issues/1311)) diff --git a/adapter/main.go b/adapter/main.go index 1a44e307345..bb6134ba446 100644 --- a/adapter/main.go +++ b/adapter/main.go @@ -3,6 +3,8 @@ package main import ( "flag" "fmt" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/record" "os" "runtime" "strconv" @@ -67,7 +69,9 @@ func (a *Adapter) makeProvider(globalHTTPTimeout time.Duration) (provider.Metric return nil, fmt.Errorf("unable to construct new client (%s)", err) } - handler := scaling.NewScaleHandler(kubeclient, nil, scheme, globalHTTPTimeout) + broadcaster := record.NewBroadcaster() + recorder := broadcaster.NewRecorder(scheme, corev1.EventSource{Component: "keda-metrics-adapter"}) + handler := scaling.NewScaleHandler(kubeclient, nil, scheme, globalHTTPTimeout, recorder) namespace, err := getWatchNamespace() if err != nil { diff --git a/controllers/scaledjob_controller.go b/controllers/scaledjob_controller.go index df7726f2ab9..9c9e6b76fcf 100644 --- a/controllers/scaledjob_controller.go +++ b/controllers/scaledjob_controller.go @@ -3,6 +3,9 @@ package controllers import ( "context" "fmt" + "github.com/kedacore/keda/v2/pkg/eventreason" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/record" "time" "github.com/go-logr/logr" @@ -28,13 +31,14 @@ type ScaledJobReconciler struct { client.Client Log logr.Logger Scheme *runtime.Scheme + Recorder record.EventRecorder scaleHandler scaling.ScaleHandler globalHTTPTimeout time.Duration } // SetupWithManager initializes the ScaledJobReconciler instance and starts a new controller managed by the passed Manager instance. func (r *ScaledJobReconciler) SetupWithManager(mgr ctrl.Manager) error { - r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), nil, mgr.GetScheme(), r.globalHTTPTimeout) + r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), nil, mgr.GetScheme(), r.globalHTTPTimeout, mgr.GetEventRecorderFor("scale-handler")) return ctrl.NewControllerManagedBy(mgr). // Ignore updates to ScaledJob Status (in this case metadata.Generation does not change) @@ -84,9 +88,11 @@ func (r *ScaledJobReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { reqLogger.Error(err, msg) conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledJobCheckFailed", msg) conditions.SetActiveCondition(metav1.ConditionUnknown, "UnknownState", "ScaledJob check failed") + r.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.CheckFailed, msg) } else { reqLogger.V(1).Info(msg) conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledJobReady", msg) + r.Recorder.Event(scaledJob, corev1.EventTypeNormal, eventreason.Ready, msg) } return ctrl.Result{}, err diff --git a/controllers/scaledjob_finalizer.go b/controllers/scaledjob_finalizer.go index aa0ca78c7a6..73eb7ab1a2b 100644 --- a/controllers/scaledjob_finalizer.go +++ b/controllers/scaledjob_finalizer.go @@ -2,6 +2,8 @@ package controllers import ( "context" + "github.com/kedacore/keda/v2/pkg/eventreason" + corev1 "k8s.io/api/core/v1" "github.com/go-logr/logr" @@ -33,6 +35,7 @@ func (r *ScaledJobReconciler) finalizeScaledJob(logger logr.Logger, scaledJob *k } logger.Info("Successfully finalized ScaledJob") + r.Recorder.Event(scaledJob, corev1.EventTypeNormal, eventreason.Deleted, "ScaledJob was deleted") return nil } diff --git a/controllers/scaledobject_controller.go b/controllers/scaledobject_controller.go index 0414fd9a3f5..e16883be06d 100644 --- a/controllers/scaledobject_controller.go +++ b/controllers/scaledobject_controller.go @@ -3,6 +3,9 @@ package controllers import ( "context" "fmt" + "github.com/kedacore/keda/v2/pkg/eventreason" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/record" "sync" "time" @@ -42,9 +45,10 @@ import ( // ScaledObjectReconciler reconciles a ScaledObject object type ScaledObjectReconciler struct { - Log logr.Logger - Client client.Client - Scheme *runtime.Scheme + Log logr.Logger + Client client.Client + Scheme *runtime.Scheme + Recorder record.EventRecorder scaleClient *scale.ScalesGetter restMapper meta.RESTMapper @@ -92,7 +96,7 @@ func (r *ScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager) error { // Init the rest of ScaledObjectReconciler r.restMapper = mgr.GetRESTMapper() r.scaledObjectsGenerations = &sync.Map{} - r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), r.scaleClient, mgr.GetScheme(), r.globalHTTPTimeout) + r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), r.scaleClient, mgr.GetScheme(), r.globalHTTPTimeout, mgr.GetEventRecorderFor("scale-handler")) // Start controller return ctrl.NewControllerManagedBy(mgr). @@ -160,9 +164,11 @@ func (r *ScaledObjectReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error reqLogger.Error(err, msg) conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledObjectCheckFailed", msg) conditions.SetActiveCondition(metav1.ConditionUnknown, "UnkownState", "ScaledObject check failed") + r.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.CheckFailed, msg) } else { reqLogger.V(1).Info(msg) conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledObjectReady", msg) + r.Recorder.Event(scaledObject, corev1.EventTypeNormal, eventreason.Ready, msg) } if err := kedacontrollerutil.SetStatusConditions(r.Client, reqLogger, scaledObject, &conditions); err != nil { return ctrl.Result{}, err diff --git a/controllers/scaledobject_finalizer.go b/controllers/scaledobject_finalizer.go index 778ee67ee25..216615a1822 100644 --- a/controllers/scaledobject_finalizer.go +++ b/controllers/scaledobject_finalizer.go @@ -2,6 +2,8 @@ package controllers import ( "context" + "github.com/kedacore/keda/v2/pkg/eventreason" + corev1 "k8s.io/api/core/v1" "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/api/errors" @@ -54,6 +56,7 @@ func (r *ScaledObjectReconciler) finalizeScaledObject(logger logr.Logger, scaled } logger.Info("Successfully finalized ScaledObject") + r.Recorder.Event(scaledObject, corev1.EventTypeNormal, eventreason.Deleted, "ScaledObject was deleted") return nil } diff --git a/go.sum b/go.sum index e45c04315bc..de02f21d99a 100644 --- a/go.sum +++ b/go.sum @@ -892,6 +892,7 @@ github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfV github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= +github.com/kedacore/keda v1.5.0 h1:c8xA1Vo3H7rPwFiWUX3CBXnjBSrbYDmUs9iEfDlf4bQ= github.com/kelseyhightower/envconfig v1.3.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM= diff --git a/main.go b/main.go index 99edd95a0f0..478c681188a 100644 --- a/main.go +++ b/main.go @@ -109,17 +109,19 @@ func main() { } if err = (&controllers.ScaledObjectReconciler{ - Client: mgr.GetClient(), - Log: ctrl.Log.WithName("controllers").WithName("ScaledObject"), - Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + Log: ctrl.Log.WithName("controllers").WithName("ScaledObject"), + Scheme: mgr.GetScheme(), + Recorder: mgr.GetEventRecorderFor("scaledobject-controller"), }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "ScaledObject") os.Exit(1) } if err = (&controllers.ScaledJobReconciler{ - Client: mgr.GetClient(), - Log: ctrl.Log.WithName("controllers").WithName("ScaledJob"), - Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + Log: ctrl.Log.WithName("controllers").WithName("ScaledJob"), + Scheme: mgr.GetScheme(), + Recorder: mgr.GetEventRecorderFor("scaledjob-controller"), }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "ScaledJob") os.Exit(1) diff --git a/pkg/eventreason/eventreason.go b/pkg/eventreason/eventreason.go new file mode 100644 index 00000000000..c7caeac0cd9 --- /dev/null +++ b/pkg/eventreason/eventreason.go @@ -0,0 +1,31 @@ +/* +Copyright 2020 The KEDA Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventreason + +const ( + Ready = "Ready" + CheckFailed = "CheckFailed" + Deleted = "Deleted" + ScalersStarted = "ScalersStarted" + ScalersRestarted = "ScalersRestarted" + ScalersStopped = "ScalersStopped" + ScaleTargetActivated = "ScaleTargetActivated" + ScaleTargetDeactivated = "ScaleTargetDeactivated" + ScaleTargetActivationFailed = "ScaleTargetActivationFailed" + ScaleTargetDeactivationFailed = "ScaleTargetDeactivationFailed" + JobsCreated = "JobsCreated" +) diff --git a/pkg/scaling/executor/scale_executor.go b/pkg/scaling/executor/scale_executor.go index 4a703afe8d8..18c412f06c0 100644 --- a/pkg/scaling/executor/scale_executor.go +++ b/pkg/scaling/executor/scale_executor.go @@ -3,6 +3,7 @@ package executor import ( "context" "fmt" + "k8s.io/client-go/tools/record" "github.com/go-logr/logr" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -30,15 +31,17 @@ type scaleExecutor struct { scaleClient *scale.ScalesGetter reconcilerScheme *runtime.Scheme logger logr.Logger + recorder record.EventRecorder } // NewScaleExecutor creates a ScaleExecutor object -func NewScaleExecutor(client client.Client, scaleClient *scale.ScalesGetter, reconcilerScheme *runtime.Scheme) ScaleExecutor { +func NewScaleExecutor(client client.Client, scaleClient *scale.ScalesGetter, reconcilerScheme *runtime.Scheme, recorder record.EventRecorder) ScaleExecutor { return &scaleExecutor{ client: client, scaleClient: scaleClient, reconcilerScheme: reconcilerScheme, logger: logf.Log.WithName("scaleexecutor"), + recorder: recorder, } } diff --git a/pkg/scaling/executor/scale_jobs.go b/pkg/scaling/executor/scale_jobs.go index 43c72272ee5..a01c4424fc4 100644 --- a/pkg/scaling/executor/scale_jobs.go +++ b/pkg/scaling/executor/scale_jobs.go @@ -2,6 +2,7 @@ package executor import ( "context" + "github.com/kedacore/keda/v2/pkg/eventreason" "sort" "strconv" @@ -108,6 +109,7 @@ func (e *scaleExecutor) createJobs(logger logr.Logger, scaledJob *kedav1alpha1.S } } logger.Info("Created jobs", "Number of jobs", scaleTo) + e.recorder.Eventf(scaledJob, corev1.EventTypeNormal, eventreason.JobsCreated, "Created %d jobs", scaleTo) } func (e *scaleExecutor) isJobFinished(j *batchv1.Job) bool { diff --git a/pkg/scaling/executor/scale_scaledobjects.go b/pkg/scaling/executor/scale_scaledobjects.go index 6e4a5c1cc4b..5716ba2b697 100644 --- a/pkg/scaling/executor/scale_scaledobjects.go +++ b/pkg/scaling/executor/scale_scaledobjects.go @@ -2,6 +2,8 @@ package executor import ( "context" + "github.com/kedacore/keda/v2/pkg/eventreason" + corev1 "k8s.io/api/core/v1" "time" "github.com/go-logr/logr" @@ -121,13 +123,16 @@ func (e *scaleExecutor) scaleToZero(ctx context.Context, logger logr.Logger, sca if scaledObject.Status.LastActiveTime == nil || scaledObject.Status.LastActiveTime.Add(cooldownPeriod).Before(time.Now()) { // or last time a trigger was active was > cooldown period, so scale down. - _, err := e.updateScaleOnScaleTarget(ctx, scaledObject, scale, 0) + currentReplicas, err := e.updateScaleOnScaleTarget(ctx, scaledObject, scale, 0) if err == nil { logger.Info("Successfully scaled ScaleTarget to 0 replicas") + e.recorder.Eventf(scaledObject, corev1.EventTypeNormal, eventreason.ScaleTargetDeactivated, "Deactivated %s %s/%s from %d to %d", scaledObject.Spec.ScaleTargetRef.Kind, scaledObject.Namespace, scaledObject.Spec.ScaleTargetRef.Name, currentReplicas, 0) if err := e.setActiveCondition(ctx, logger, scaledObject, metav1.ConditionFalse, "ScalerNotActive", "Scaling is not performed because triggers are not active"); err != nil { logger.Error(err, "Error in setting active condition") return } + } else { + e.recorder.Eventf(scaledObject, corev1.EventTypeWarning, eventreason.ScaleTargetDeactivationFailed, "Failed to deactivated %s %s/%s", scaledObject.Spec.ScaleTargetRef.Kind, scaledObject.Namespace, scaledObject.Spec.ScaleTargetRef.Name, currentReplicas, 0) } } else { logger.V(1).Info("ScaleTarget cooling down", @@ -158,12 +163,15 @@ func (e *scaleExecutor) scaleFromZero(ctx context.Context, logger logr.Logger, s logger.Info("Successfully updated ScaleTarget", "Original Replicas Count", currentReplicas, "New Replicas Count", replicas) + e.recorder.Eventf(scaledObject, corev1.EventTypeNormal, eventreason.ScaleTargetActivated, "Scaled %s %s/%s from %d to %d", scaledObject.Spec.ScaleTargetRef.Kind, scaledObject.Namespace, scaledObject.Spec.ScaleTargetRef.Name, currentReplicas, replicas) // Scale was successful. Update lastScaleTime and lastActiveTime on the scaledObject if err := e.updateLastActiveTime(ctx, logger, scaledObject); err != nil { logger.Error(err, "Error in Updating lastScaleTime and lastActiveTime on the scaledObject") return } + } else { + e.recorder.Eventf(scaledObject, corev1.EventTypeWarning, eventreason.ScaleTargetActivationFailed, "Failed to scaled %s %s/%s from %d to %d", scaledObject.Spec.ScaleTargetRef.Kind, scaledObject.Namespace, scaledObject.Spec.ScaleTargetRef.Name, currentReplicas, replicas) } } diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index fc18381f2f6..479bcc58101 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -3,6 +3,8 @@ package scaling import ( "context" "fmt" + "github.com/kedacore/keda/v2/pkg/eventreason" + "k8s.io/client-go/tools/record" "sync" "time" @@ -44,16 +46,18 @@ type scaleHandler struct { scaleLoopContexts *sync.Map scaleExecutor executor.ScaleExecutor globalHTTPTimeout time.Duration + recorder record.EventRecorder } // NewScaleHandler creates a ScaleHandler object -func NewScaleHandler(client client.Client, scaleClient *scale.ScalesGetter, reconcilerScheme *runtime.Scheme, globalHTTPTimeout time.Duration) ScaleHandler { +func NewScaleHandler(client client.Client, scaleClient *scale.ScalesGetter, reconcilerScheme *runtime.Scheme, globalHTTPTimeout time.Duration, recorder record.EventRecorder) ScaleHandler { return &scaleHandler{ client: client, logger: logf.Log.WithName("scalehandler"), scaleLoopContexts: &sync.Map{}, - scaleExecutor: executor.NewScaleExecutor(client, scaleClient, reconcilerScheme), + scaleExecutor: executor.NewScaleExecutor(client, scaleClient, reconcilerScheme, recorder), globalHTTPTimeout: globalHTTPTimeout, + recorder: recorder, } } @@ -90,6 +94,9 @@ func (h *scaleHandler) HandleScalableObject(scalableObject interface{}) error { cancelValue() } h.scaleLoopContexts.Store(key, cancel) + h.recorder.Event(withTriggers, corev1.EventTypeNormal, eventreason.ScalersRestarted, "Restarted scalers watch") + } else { + h.recorder.Event(withTriggers, corev1.EventTypeNormal, eventreason.ScalersStarted, "Started scalers watch") } // a mutex is used to synchronize scale requests per scalableObject @@ -115,6 +122,7 @@ func (h *scaleHandler) DeleteScalableObject(scalableObject interface{}) error { cancel() } h.scaleLoopContexts.Delete(key) + h.recorder.Event(withTriggers, corev1.EventTypeNormal, eventreason.ScalersStopped, "Stopped scalers watch") } else { h.logger.V(1).Info("ScaleObject was not found in controller cache", "key", key) }