Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datadog scaler multi query #1

Closed
wants to merge 9 commits into from
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ To learn more about our roadmap, we recommend reading [this document](ROADMAP.md

### Improvements

- TODO ([#XXX](https://github.com/kedacore/keda/issue/XXX))
- **Datadog Scaler:** Support multi-query metrics, and aggregation ([#3423](https://github.com/kedacore/keda/issues/3423))

### Fixes

Expand Down
83 changes: 69 additions & 14 deletions pkg/scalers/datadog_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type datadogMetadata struct {
datadogSite string
query string
queryValue float64
queryAggegrator string
activationQueryValue float64
vType v2beta2.MetricTargetType
metricName string
Expand All @@ -37,6 +38,9 @@ type datadogMetadata struct {
fillValue float64
}

const maxString = "max"
const avgString = "average"

var filter *regexp.Regexp

func init() {
Expand Down Expand Up @@ -111,6 +115,20 @@ func parseDatadogMetadata(config *ScalerConfig, logger logr.Logger) (*datadogMet
return nil, fmt.Errorf("no queryValue given")
}

allowedQueryAggregators := []string{avgString, maxString}

if val, ok := config.TriggerMetadata["queryAggregator"]; ok && val != "" {
queryAggregator := strings.ToLower(val)
_, found := FindStringInSlice(allowedQueryAggregators, queryAggregator)
if found {
meta.queryAggegrator = queryAggregator
} else {
return nil, fmt.Errorf("queryAggregator has to be one of %+q", queryAggregator)
}
} else {
meta.queryAggegrator = maxString
}

meta.activationQueryValue = 0
if val, ok := config.TriggerMetadata["activationQueryValue"]; ok {
activationQueryValue, err := strconv.ParseFloat(val, 64)
Expand All @@ -136,7 +154,7 @@ func parseDatadogMetadata(config *ScalerConfig, logger logr.Logger) (*datadogMet
}
val = strings.ToLower(val)
switch val {
case "average":
case avgString:
meta.vType = v2beta2.AverageValueMetricType
case "global":
meta.vType = v2beta2.ValueMetricType
Expand Down Expand Up @@ -272,29 +290,35 @@ func (s *datadogScaler) getQueryResult(ctx context.Context) (float64, error) {

series := resp.GetSeries()

if len(series) > 1 {
return 0, fmt.Errorf("query returned more than 1 series; modify the query to return only 1 series")
}

if len(series) == 0 {
if !s.metadata.useFiller {
return 0, fmt.Errorf("no Datadog metrics returned for the given time window")
}
return s.metadata.fillValue, nil
}

points := series[0].GetPointlist()

index := len(points) - 1
if len(points) == 0 || len(points[index]) < 2 {
if !s.metadata.useFiller {
return 0, fmt.Errorf("no Datadog metrics returned for the given time window")
// Collect all latest point values from any/all series
results := make([]float64, len(series))
for i := 0; i < len(series); i++ {
points := series[i].GetPointlist()
if len(points) == 0 || len(points[0]) < 2 {
if !s.metadata.useFiller {
return 0, fmt.Errorf("no Datadog metrics returned for the given time window")
}
return s.metadata.fillValue, nil
}
return s.metadata.fillValue, nil
// Return the last point from the series
index := len(points) - 1
results[i] = *points[index][1]
}

// Return the last point from the series
return *points[index][1], nil
switch s.metadata.queryAggegrator {
case avgString:
return AvgFloatFromSlice(results), nil
default:
// Aggregate Results - default Max value:
return MaxFloatFromSlice(results), nil
}
}

// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler
Expand Down Expand Up @@ -323,3 +347,34 @@ func (s *datadogScaler) GetMetrics(ctx context.Context, metricName string, metri

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

// Takes a slice of strings, and looks for a string in it. If found it will
// return it's key/index, otherwise it will return -1 and a bool of false.
func FindStringInSlice(slice []string, val string) (int, bool) {
for i, item := range slice {
if item == val {
return i, true
}
}
return -1, false
}

// Find the largest value in a slice of floats
func MaxFloatFromSlice(results []float64) float64 {
max := results[0]
for _, result := range results {
if result > max {
max = result
}
}
return max
}

// Find the average value in a slice of floats
func AvgFloatFromSlice(results []float64) float64 {
total := 0.0
for _, result := range results {
total += result
}
return total / float64(len(results))
}
41 changes: 41 additions & 0 deletions pkg/scalers/datadog_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,41 @@ type datadogAuthMetadataTestData struct {
isError bool
}

func assertEqual(t *testing.T, a interface{}, b interface{}) {
if a == b {
return
}
t.Errorf("%v != %v", a, b)
}

func TestFindStringInSlice(t *testing.T) {
inputSlice := []string{"this", "looks", "for", "strings"}
inputValue := "looks"
expectedIndex, expectedBool := int(1), bool(true)

outputIndex, outputBool := FindStringInSlice(inputSlice, inputValue)
assertEqual(t, outputIndex, expectedIndex)
assertEqual(t, outputBool, expectedBool)
}

func TestMaxFloatFromSlice(t *testing.T) {
input := []float64{1.0, 2.0, 3.0, 4.0}
expectedOutput := float64(4.0)

output := MaxFloatFromSlice(input)

assertEqual(t, output, expectedOutput)
}

func TestAvgFloatFromSlice(t *testing.T) {
input := []float64{1.0, 2.0, 3.0, 4.0}
expectedOutput := float64(2.5)

output := AvgFloatFromSlice(input)

assertEqual(t, output, expectedOutput)
}

var testParseQueries = []datadogQueries{
{"", false, true},
// All properly formed
Expand All @@ -36,6 +71,8 @@ var testParseQueries = []datadogQueries{
{"top(per_second(abs(sum:http.requests{service:myapp,dc:us-west-2}.rollup(max, 2))), 5, 'mean', 'desc')", true, false},
{"system.cpu.user{*}.rollup(sum, 30)", true, false},
{"min:system.cpu.user{*}", true, false},
// Multi-query
{"avg:system.cpu.user{*}.rollup(sum, 30),sum:system.cpu.user{*}.rollup(30)", true, false},

// Missing filter
{"min:system.cpu.user", false, true},
Expand Down Expand Up @@ -63,6 +100,8 @@ var testDatadogMetadata = []datadogAuthMetadataTestData{

// all properly formed
{"", map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count()", "queryValue": "7", "metricUnavailableValue": "1.5", "type": "average", "age": "60"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, false},
// Multi-query all properly formed
{"", map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count(),sum:trace.redis.command.hits{env:none,service:redis}.as_count()/2", "queryValue": "7", "queryAggregator": "average", "metricUnavailableValue": "1.5", "type": "average", "age": "60"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, false},
// default age
{"", map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count()", "queryValue": "7", "type": "average"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, false},
// default type
Expand All @@ -77,6 +116,8 @@ var testDatadogMetadata = []datadogAuthMetadataTestData{
{"", map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count()", "type": "average", "age": "60"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, true},
// wrong query value type
{"", map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count()", "queryValue": "notanint", "type": "average", "age": "60"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, true},
// wrong queryAggregator value
{"", map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count()", "queryValue": "notanint", "queryAggegrator": "1.0", "type": "average", "age": "60"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, true},
// wrong activation query value type
{"", map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count()", "queryValue": "1", "activationQueryValue": "notanint", "type": "average", "age": "60"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, true},
// malformed query
Expand Down