Skip to content

Commit

Permalink
Fix: READY and ACTIVE fields of ScaledJob to show status. (kedacore#1855
Browse files Browse the repository at this point in the history
)

Signed-off-by: Shubham Kuchhal <shubham.kuchhal@india.nec.com>
Signed-off-by: nilayasiktoprak <nilayasiktoprak@gmail.com>
  • Loading branch information
Shubham82 authored and nilayasiktoprak committed Oct 23, 2021
1 parent f29f780 commit d80447e
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 25 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

### Improvements

- TODO ([#XXX](https://github.com/kedacore/keda/pull/XXX))
- Fix READY and ACTIVE fields of ScaledJob to show status when we run `kubectl get sj` ([#1855](https://github.com/kedacore/keda/pull/1855))

### Breaking Changes

Expand Down
61 changes: 39 additions & 22 deletions controllers/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"time"

kedacontrollerutil "github.com/kedacore/keda/v2/controllers/util"
"github.com/kedacore/keda/v2/pkg/eventreason"
corev1 "k8s.io/api/core/v1"

Expand Down Expand Up @@ -80,47 +81,63 @@ func (r *ScaledJobReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
return ctrl.Result{}, err
}

var errMsg string
if scaledJob.Spec.JobTargetRef != nil {
reqLogger.Info("Detected ScaleType = Job")
conditions := scaledJob.Status.Conditions.DeepCopy()
msg, err := r.reconcileScaledJob(reqLogger, scaledJob)
if err != nil {
reqLogger.Error(err, msg)
conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledJobCheckFailed", msg)
conditions.SetActiveCondition(metav1.ConditionUnknown, "UnknownState", "ScaledJob check failed")
r.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.ScaledJobCheckFailed, msg)
} else {
wasReady := conditions.GetReadyCondition()
if wasReady.IsFalse() || wasReady.IsUnknown() {
r.Recorder.Event(scaledJob, corev1.EventTypeNormal, eventreason.ScaledJobReady, "ScaledJob is ready for scaling")
}
reqLogger.V(1).Info(msg)
conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledJobReady", msg)
// ensure Status Conditions are initialized
if !scaledJob.Status.Conditions.AreInitialized() {
conditions := kedav1alpha1.GetInitializedConditions()
if err := kedacontrollerutil.SetStatusConditions(r.Client, reqLogger, scaledJob, conditions); err != nil {
return ctrl.Result{}, err
}
}

// Check jobTargetRef is specified
if scaledJob.Spec.JobTargetRef == nil {
errMsg := "scaledJob.spec.jobTargetRef is not set"
err := fmt.Errorf(errMsg)
reqLogger.Error(err, "scaledJob.spec.jobTargetRef not found")
return ctrl.Result{}, err
}
msg, err := r.reconcileScaledJob(reqLogger, scaledJob)
conditions := scaledJob.Status.Conditions.DeepCopy()
if err != nil {
reqLogger.Error(err, msg)
conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledJobCheckFailed", msg)
conditions.SetActiveCondition(metav1.ConditionUnknown, "UnknownState", "ScaledJob check failed")
r.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.ScaledJobCheckFailed, msg)
} else {
wasReady := conditions.GetReadyCondition()
if wasReady.IsFalse() || wasReady.IsUnknown() {
r.Recorder.Event(scaledJob, corev1.EventTypeNormal, eventreason.ScaledJobReady, "ScaledJob is ready for scaling")
}
reqLogger.V(1).Info(msg)
conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledJobReady", msg)
}

errMsg = "scaledJob.Spec.JobTargetRef is not set"
err = fmt.Errorf(errMsg)
reqLogger.Error(err, "scaledJob.Spec.JobTargetRef not found")
if err := kedacontrollerutil.SetStatusConditions(r.Client, reqLogger, scaledJob, &conditions); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, err
}

// reconcileJobType implemets reconciler logic for K8s Jobs based ScaleObject
// reconcileScaledJob implements reconciler logic for K8s Jobs based ScaledJob
func (r *ScaledJobReconciler) reconcileScaledJob(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) (string, error) {
msg, err := r.deletePreviousVersionScaleJobs(logger, scaledJob)
if err != nil {
return msg, err
}

// Check ScaledJob is Ready or not
_, err = r.scaleHandler.GetScalers(scaledJob)
if err != nil {
logger.Error(err, "Error getting scalers")
return "Failed to ensure ScaledJob is correctly created", err
}

// scaledJob was created or modified - let's start a new ScaleLoop
err = r.requestScaleLoop(logger, scaledJob)
if err != nil {
return "Failed to start a new scale loop with scaling logic", err
}
logger.Info("Initializing Scaling logic according to ScaledObject Specification")
logger.Info("Initializing Scaling logic according to ScaledJob Specification")
return "ScaledJob is defined correctly and is ready to scaling", nil
}

Expand Down
19 changes: 17 additions & 2 deletions pkg/scaling/executor/scale_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,21 @@ func (e *scaleExecutor) RequestJobScale(ctx context.Context, scaledJob *kedav1al
logger.V(1).Info("No change in activity")
}

condition := scaledJob.Status.Conditions.GetActiveCondition()
if condition.IsUnknown() || condition.IsTrue() != isActive {
if isActive {
if err := e.setActiveCondition(ctx, logger, scaledJob, metav1.ConditionTrue, "ScalerActive", "Scaling is performed because triggers are active"); err != nil {
logger.Error(err, "Error setting active condition when triggers are active")
return
}
} else {
if err := e.setActiveCondition(ctx, logger, scaledJob, metav1.ConditionFalse, "ScalerNotActive", "Scaling is not performed because triggers are not active"); err != nil {
logger.Error(err, "Error setting active condition when triggers are not active")
return
}
}
}

err := e.cleanUp(scaledJob)
if err != nil {
logger.Error(err, "Failed to cleanUp jobs")
Expand Down Expand Up @@ -98,10 +113,10 @@ func (e *scaleExecutor) createJobs(logger logr.Logger, scaledJob *kedav1alpha1.S
job.Spec.Template.Spec.RestartPolicy = corev1.RestartPolicyOnFailure
}

// Set ScaledObject instance as the owner and controller
// Set ScaledJob instance as the owner and controller
err := controllerutil.SetControllerReference(scaledJob, job, e.reconcilerScheme)
if err != nil {
logger.Error(err, "Failed to set ScaledObject as the owner of the new Job")
logger.Error(err, "Failed to set ScaledJob as the owner of the new Job")
}

err = e.client.Create(context.TODO(), job)
Expand Down

0 comments on commit d80447e

Please sign in to comment.