Skip to content

Commit

Permalink
triggerName is part of Prometheus metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Zbynek Roubalik <zroubalik@gmail.com>
  • Loading branch information
zroubalik committed Nov 8, 2022
1 parent 4855d55 commit 21a442b
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 73 deletions.
4 changes: 2 additions & 2 deletions controllers/keda/hpa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ func setupTest(health map[string]v1alpha1.HealthStatus, scaler *mock_scalers.Moc
scalersCache := cache.ScalersCache{
Scalers: []cache.ScalerBuilder{{
Scaler: scaler,
Factory: func() (scalers.Scaler, error) {
return scaler, nil
Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) {
return scaler, &scalers.ScalerConfig{}, nil
},
}},
Logger: logr.Discard(),
Expand Down
26 changes: 26 additions & 0 deletions controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,11 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logg
return "ScaledObject doesn't have correct Idle/Min/Max Replica Counts specification", err
}

err = r.checkTriggerNamesAreUnique(scaledObject)
if err != nil {
return "ScaledObject doesn't have correct triggers specification", err
}

// Create a new HPA or update existing one according to ScaledObject
newHPACreated, err := r.ensureHPAForScaledObjectExists(ctx, logger, scaledObject, &gvkr)
if err != nil {
Expand Down Expand Up @@ -357,6 +362,27 @@ func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(ctx context.Conte
return gvkr, nil
}

// checkTriggerNamesAreUnique checks that all triggerNames in ScaledObject are unique
func (r *ScaledObjectReconciler) checkTriggerNamesAreUnique(scaledObject *kedav1alpha1.ScaledObject) error {
triggersCount := len(scaledObject.Spec.Triggers)

if triggersCount > 1 {
triggerNames := make(map[string]bool, triggersCount)
for i := 0; i < triggersCount; i++ {
name := scaledObject.Spec.Triggers[i].Name
if name != "" {
if _, found := triggerNames[name]; found {
// found duplicate name
return fmt.Errorf("triggerName=%s is defined multiple times in the ScaledObject, but it must be unique", name)
}
triggerNames[name] = true
}
}
}

return nil
}

// checkReplicaCountBoundsAreValid checks that Idle/Min/Max ReplicaCount defined in ScaledObject are correctly specified
// ie. that Min is not greater then Max or Idle greater or equal to Min
func (r *ScaledObjectReconciler) checkReplicaCountBoundsAreValid(scaledObject *kedav1alpha1.ScaledObject) error {
Expand Down
69 changes: 63 additions & 6 deletions controllers/keda/scaledobject_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,9 @@ var _ = Describe("ScaledObjectController", func() {

testScalers = append(testScalers, cache.ScalerBuilder{
Scaler: s,
Factory: func() (scalers.Scaler, error) {
return scalers.NewPrometheusScaler(config)
Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) {
scaler, err := scalers.NewPrometheusScaler(config)
return scaler, config, err
},
})
for _, metricSpec := range s.GetMetricSpecForScaling(context.Background()) {
Expand Down Expand Up @@ -161,8 +162,8 @@ var _ = Describe("ScaledObjectController", func() {
scalersCache := cache.ScalersCache{
Scalers: []cache.ScalerBuilder{{
Scaler: s,
Factory: func() (scalers.Scaler, error) {
return s, nil
Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) {
return s, config, nil
},
}},
}
Expand Down Expand Up @@ -205,8 +206,8 @@ var _ = Describe("ScaledObjectController", func() {

testScalers = append(testScalers, cache.ScalerBuilder{
Scaler: s,
Factory: func() (scalers.Scaler, error) {
return s, nil
Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) {
return s, config, nil
},
})
}
Expand Down Expand Up @@ -659,6 +660,62 @@ var _ = Describe("ScaledObjectController", func() {
return so.Status.Conditions.GetReadyCondition().Status
}, 20*time.Second).Should(Equal(metav1.ConditionFalse))
})

It("doesn't allow non-unique triggerName in ScaledObject", func() {
deploymentName := "non-unique-triggername"
soName := "so-" + deploymentName

triggerName := "non-unique"

// Create the scaling target.
err := k8sClient.Create(context.Background(), generateDeployment(deploymentName))
Expect(err).ToNot(HaveOccurred())

var five int32 = 5
var ten int32 = 10

// Create the ScaledObject with two triggers
so := &kedav1alpha1.ScaledObject{
ObjectMeta: metav1.ObjectMeta{Name: soName, Namespace: "default"},
Spec: kedav1alpha1.ScaledObjectSpec{
ScaleTargetRef: &kedav1alpha1.ScaleTarget{
Name: deploymentName,
},
IdleReplicaCount: &ten,
MinReplicaCount: &five,
Triggers: []kedav1alpha1.ScaleTriggers{
{
Type: "cron",
Name: triggerName,
Metadata: map[string]string{
"timezone": "UTC",
"start": "0 * * * *",
"end": "1 * * * *",
"desiredReplicas": "1",
},
},
{
Type: "cron",
Name: triggerName,
Metadata: map[string]string{
"timezone": "UTC",
"start": "10 * * * *",
"end": "11 * * * *",
"desiredReplicas": "1",
},
},
},
},
}
err = k8sClient.Create(context.Background(), so)
Ω(err).ToNot(HaveOccurred())

Eventually(func() metav1.ConditionStatus {
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so)
Ω(err).ToNot(HaveOccurred())
return so.Status.Conditions.GetReadyCondition().Status
}, 20*time.Second).Should(Equal(metav1.ConditionFalse))
})
})

It("scaleobject ready condition 'False/Unknow' to 'True' will requeue", func() {
Expand Down
16 changes: 8 additions & 8 deletions pkg/prommetrics/adapter_prommetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

var (
metricLabels = []string{"namespace", "metric", "scaledObject", "scaler", "scalerIndex", "scalerName"}
metricLabels = []string{"namespace", "metric", "scaledObject", "scaler", "scalerIndex"}
scalerErrorsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "keda_metrics_adapter",
Expand Down Expand Up @@ -100,21 +100,21 @@ func (metricsServer PrometheusMetricServer) NewServer(address string, pattern st
}

// RecordHPAScalerMetric create a measurement of the external metric used by the HPA
func (metricsServer PrometheusMetricServer) RecordHPAScalerMetric(namespace string, scaledObject string, scaler string, scalerIndex int, scalerName string, metric string, value float64) {
scalerMetricsValue.With(getLabels(namespace, scaledObject, scaler, scalerIndex, scalerName, metric)).Set(value)
func (metricsServer PrometheusMetricServer) RecordHPAScalerMetric(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, value float64) {
scalerMetricsValue.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)).Set(value)
}

// RecordHPAScalerError counts the number of errors occurred in trying get an external metric used by the HPA
func (metricsServer PrometheusMetricServer) RecordHPAScalerError(namespace string, scaledObject string, scaler string, scalerIndex int, scalerName string, metric string, err error) {
func (metricsServer PrometheusMetricServer) RecordHPAScalerError(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, err error) {
if err != nil {
scalerErrors.With(getLabels(namespace, scaledObject, scaler, scalerIndex, scalerName, metric)).Inc()
scalerErrors.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)).Inc()
// scaledObjectErrors.With(prometheus.Labels{"namespace": namespace, "scaledObject": scaledObject}).Inc()
metricsServer.RecordScalerObjectError(namespace, scaledObject, err)
scalerErrorsTotal.With(prometheus.Labels{}).Inc()
return
}
// initialize metric with 0 if not already set
_, errscaler := scalerErrors.GetMetricWith(getLabels(namespace, scaledObject, scaler, scalerIndex, scalerName, metric))
_, errscaler := scalerErrors.GetMetricWith(getLabels(namespace, scaledObject, scaler, scalerIndex, metric))
if errscaler != nil {
log.Fatalf("Unable to write to serve custom metrics: %v", errscaler)
}
Expand All @@ -135,6 +135,6 @@ func (metricsServer PrometheusMetricServer) RecordScalerObjectError(namespace st
}
}

func getLabels(namespace string, scaledObject string, scaler string, scalerIndex int, scalerName string, metric string) prometheus.Labels {
return prometheus.Labels{"namespace": namespace, "scaledObject": scaledObject, "scaler": scaler, "scalerIndex": strconv.Itoa(scalerIndex), "scalerName": scalerName, "metric": metric}
func getLabels(namespace string, scaledObject string, scaler string, scalerIndex int, metric string) prometheus.Labels {
return prometheus.Labels{"namespace": namespace, "scaledObject": scaledObject, "scaler": scaler, "scalerIndex": strconv.Itoa(scalerIndex), "metric": metric}
}
19 changes: 11 additions & 8 deletions pkg/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,15 @@ func (p *KedaProvider) GetExternalMetric(ctx context.Context, namespace string,
return nil, fmt.Errorf("error when getting scalers %s", err)
}

// let's check metrics for all scalers in a ScaledObject
scalerError := false

for scalerIndex, scalerPair := range cache.GetScalers() {
metricSpecs := scalerPair.Scaler.GetMetricSpecForScaling(ctx)
scalerName := strings.Replace(fmt.Sprintf("%T", scalerPair.Scaler), "*scalers.", "", 1)
triggerName := scalerPair.TriggerName
scalers, scalerConfigs := cache.GetScalers()
for scalerIndex := 0; scalerIndex < len(scalers); scalerIndex++ {
metricSpecs := scalers[scalerIndex].GetMetricSpecForScaling(ctx)
scalerName := strings.Replace(fmt.Sprintf("%T", scalers[scalerIndex]), "*scalers.", "", 1)
if scalerConfigs[scalerIndex].TriggerName != "" {
scalerName = scalerConfigs[scalerIndex].TriggerName
}

for _, metricSpec := range metricSpecs {
// skip cpu/memory resource scaler
Expand All @@ -121,15 +124,15 @@ func (p *KedaProvider) GetExternalMetric(ctx context.Context, namespace string,
metrics, err = p.getMetricsWithFallback(ctx, metrics, err, info.Metric, scaledObject, metricSpec)
if err != nil {
scalerError = true
logger.Error(err, "error getting metric for scaler", "scaledObject.Namespace", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name, "scaler", scalerPair.Scaler)
logger.Error(err, "error getting metric for scaler", "scaledObject.Namespace", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name, "scaler", scalerName)
} else {
for _, metric := range metrics {
metricValue := metric.Value.AsApproximateFloat64()
metricsServer.RecordHPAScalerMetric(namespace, scaledObject.Name, scalerName, scalerIndex, triggerName, metric.MetricName, metricValue)
metricsServer.RecordHPAScalerMetric(namespace, scaledObject.Name, scalerName, scalerIndex, metric.MetricName, metricValue)
}
matchingMetrics = append(matchingMetrics, metrics...)
}
metricsServer.RecordHPAScalerError(namespace, scaledObject.Name, scalerName, scalerIndex, triggerName, info.Metric, err)
metricsServer.RecordHPAScalerError(namespace, scaledObject.Name, scalerName, scalerIndex, info.Metric, err)
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/scalers/scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ type ScalerConfig struct {
// The timeout to be used on all HTTP requests from the controller
GlobalHTTPTimeout time.Duration

// Name of the trigger
TriggerName string

// TriggerMetadata
TriggerMetadata map[string]string

Expand Down
32 changes: 14 additions & 18 deletions pkg/scaling/cache/scalers_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,20 @@ type ScalersCache struct {
}

type ScalerBuilder struct {
Scaler scalers.Scaler
TriggerName string
Factory func() (scalers.Scaler, error)
Scaler scalers.Scaler
ScalerConfig scalers.ScalerConfig
Factory func() (scalers.Scaler, *scalers.ScalerConfig, error)
}

type ScalerPair struct {
Scaler scalers.Scaler
TriggerName string
}

func (c *ScalersCache) GetScalers() []ScalerPair {
result := make([]ScalerPair, 0, len(c.Scalers))
func (c *ScalersCache) GetScalers() ([]scalers.Scaler, []scalers.ScalerConfig) {
scalersList := make([]scalers.Scaler, 0, len(c.Scalers))
configsList := make([]scalers.ScalerConfig, 0, len(c.Scalers))
for _, s := range c.Scalers {
result = append(result, ScalerPair{
Scaler: s.Scaler,
TriggerName: s.TriggerName,
})
scalersList = append(scalersList, s.Scaler)
configsList = append(configsList, s.ScalerConfig)
}
return result

return scalersList, configsList
}

func (c *ScalersCache) GetPushScalers() []scalers.PushScaler {
Expand Down Expand Up @@ -211,14 +206,15 @@ func (c *ScalersCache) refreshScaler(ctx context.Context, id int) (scalers.Scale
}

sb := c.Scalers[id]
ns, err := sb.Factory()
ns, sConfig, err := sb.Factory()
if err != nil {
return nil, err
}

c.Scalers[id] = ScalerBuilder{
Scaler: ns,
Factory: sb.Factory,
Scaler: ns,
ScalerConfig: *sConfig,
Factory: sb.Factory,
}
sb.Scaler.Close(ctx)

Expand Down
28 changes: 14 additions & 14 deletions pkg/scaling/cache/scalers_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ func TestIsScaledJobActive(t *testing.T) {
scaledJobSingle := createScaledObject(0, 100, "") // testing default = max
scalerSingle := []ScalerBuilder{{
Scaler: createScaler(ctrl, int64(20), int64(2), true, metricName),
Factory: func() (scalers.Scaler, error) {
return createScaler(ctrl, int64(20), int64(2), true, metricName), nil
Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) {
return createScaler(ctrl, int64(20), int64(2), true, metricName), &scalers.ScalerConfig{}, nil
},
}}

Expand All @@ -93,8 +93,8 @@ func TestIsScaledJobActive(t *testing.T) {
// Non-Active trigger only
scalerSingle = []ScalerBuilder{{
Scaler: createScaler(ctrl, int64(0), int64(2), false, metricName),
Factory: func() (scalers.Scaler, error) {
return createScaler(ctrl, int64(0), int64(2), false, metricName), nil
Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) {
return createScaler(ctrl, int64(0), int64(2), false, metricName), &scalers.ScalerConfig{}, nil
},
}}

Expand Down Expand Up @@ -123,23 +123,23 @@ func TestIsScaledJobActive(t *testing.T) {
scaledJob := createScaledObject(scalerTestData.MinReplicaCount, scalerTestData.MaxReplicaCount, scalerTestData.MultipleScalersCalculation)
scalersToTest := []ScalerBuilder{{
Scaler: createScaler(ctrl, scalerTestData.Scaler1QueueLength, scalerTestData.Scaler1AverageValue, scalerTestData.Scaler1IsActive, scalerTestData.MetricName),
Factory: func() (scalers.Scaler, error) {
return createScaler(ctrl, scalerTestData.Scaler1QueueLength, scalerTestData.Scaler1AverageValue, scalerTestData.Scaler1IsActive, scalerTestData.MetricName), nil
Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) {
return createScaler(ctrl, scalerTestData.Scaler1QueueLength, scalerTestData.Scaler1AverageValue, scalerTestData.Scaler1IsActive, scalerTestData.MetricName), &scalers.ScalerConfig{}, nil
},
}, {
Scaler: createScaler(ctrl, scalerTestData.Scaler2QueueLength, scalerTestData.Scaler2AverageValue, scalerTestData.Scaler2IsActive, scalerTestData.MetricName),
Factory: func() (scalers.Scaler, error) {
return createScaler(ctrl, scalerTestData.Scaler2QueueLength, scalerTestData.Scaler2AverageValue, scalerTestData.Scaler2IsActive, scalerTestData.MetricName), nil
Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) {
return createScaler(ctrl, scalerTestData.Scaler2QueueLength, scalerTestData.Scaler2AverageValue, scalerTestData.Scaler2IsActive, scalerTestData.MetricName), &scalers.ScalerConfig{}, nil
},
}, {
Scaler: createScaler(ctrl, scalerTestData.Scaler3QueueLength, scalerTestData.Scaler3AverageValue, scalerTestData.Scaler3IsActive, scalerTestData.MetricName),
Factory: func() (scalers.Scaler, error) {
return createScaler(ctrl, scalerTestData.Scaler3QueueLength, scalerTestData.Scaler3AverageValue, scalerTestData.Scaler3IsActive, scalerTestData.MetricName), nil
Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) {
return createScaler(ctrl, scalerTestData.Scaler3QueueLength, scalerTestData.Scaler3AverageValue, scalerTestData.Scaler3IsActive, scalerTestData.MetricName), &scalers.ScalerConfig{}, nil
},
}, {
Scaler: createScaler(ctrl, scalerTestData.Scaler4QueueLength, scalerTestData.Scaler4AverageValue, scalerTestData.Scaler4IsActive, scalerTestData.MetricName),
Factory: func() (scalers.Scaler, error) {
return createScaler(ctrl, scalerTestData.Scaler4QueueLength, scalerTestData.Scaler4AverageValue, scalerTestData.Scaler4IsActive, scalerTestData.MetricName), nil
Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) {
return createScaler(ctrl, scalerTestData.Scaler4QueueLength, scalerTestData.Scaler4AverageValue, scalerTestData.Scaler4IsActive, scalerTestData.MetricName), &scalers.ScalerConfig{}, nil
},
}}

Expand Down Expand Up @@ -167,8 +167,8 @@ func TestIsScaledJobActiveIfQueueEmptyButMinReplicaCountGreaterZero(t *testing.T
scaledJobSingle := createScaledObject(1, 100, "") // testing default = max
scalerSingle := []ScalerBuilder{{
Scaler: createScaler(ctrl, int64(0), int64(1), true, metricName),
Factory: func() (scalers.Scaler, error) {
return createScaler(ctrl, int64(0), int64(1), true, metricName), nil
Factory: func() (scalers.Scaler, *scalers.ScalerConfig, error) {
return createScaler(ctrl, int64(0), int64(1), true, metricName), &scalers.ScalerConfig{}, nil
},
}}

Expand Down
Loading

0 comments on commit 21a442b

Please sign in to comment.