Skip to content

Commit

Permalink
Redis cluster scalers (#1437)
Browse files Browse the repository at this point in the history
  • Loading branch information
goku321 committed Jan 6, 2021
1 parent 54f8db3 commit d3bacdc
Show file tree
Hide file tree
Showing 9 changed files with 1,295 additions and 113 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
### New
- Can use Pod Identity with Azure Event Hub scaler ([#994](https://github.com/kedacore/keda/issues/994))
- Introducing InfluxDB scaler ([#1239](https://github.com/kedacore/keda/issues/1239))
- Add Redis cluster support for Redis list and Redis streams scalers.

### Improvements
- Support add ScaledJob's label to its job ([#1311](https://github.com/kedacore/keda/issues/1311))
Expand Down
276 changes: 212 additions & 64 deletions pkg/scalers/redis_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/tls"
"fmt"
"strconv"
"strings"

"github.com/go-redis/redis"
v2beta2 "k8s.io/api/autoscaling/v2beta2"
Expand All @@ -23,16 +24,19 @@ const (
defaultEnableTLS = false
)

type redisAddressParser func(metadata, resolvedEnv, authParams map[string]string) (redisConnectionInfo, error)

type redisScaler struct {
metadata *redisMetadata
client *redis.Client
metadata *redisMetadata
closeFn func() error
getListLengthFn func() (int64, error)
}

type redisConnectionInfo struct {
address string
addresses []string
password string
host string
port string
hosts []string
ports []string
enableTLS bool
}

Expand All @@ -46,31 +50,96 @@ type redisMetadata struct {
var redisLog = logf.Log.WithName("redis_scaler")

// NewRedisScaler creates a new redisScaler
func NewRedisScaler(config *ScalerConfig) (Scaler, error) {
meta, err := parseRedisMetadata(config)
func NewRedisScaler(isClustered bool, config *ScalerConfig) (Scaler, error) {
luaScript := `
local listName = KEYS[1]
local listType = redis.call('type', listName).ok
local cmd = {
zset = 'zcard',
set = 'scard',
list = 'llen',
hash = 'hlen',
none = 'llen'
}
return redis.call(cmd[listType], listName)
`
if isClustered {
meta, err := parseRedisMetadata(config, parseRedisClusterAddress)
if err != nil {
return nil, fmt.Errorf("error parsing redis metadata: %s", err)
}
return createClusteredRedisScaler(meta, luaScript)
}
meta, err := parseRedisMetadata(config, parseRedisAddress)
if err != nil {
return nil, fmt.Errorf("error parsing redis metadata: %s", err)
}
options := &redis.Options{
Addr: meta.connectionInfo.address,
Password: meta.connectionInfo.password,
DB: meta.databaseIndex,
return createRedisScaler(meta, luaScript)
}

func createClusteredRedisScaler(meta *redisMetadata, script string) (Scaler, error) {
client, err := getRedisClusterClient(meta.connectionInfo)
if err != nil {
return nil, fmt.Errorf("connection to redis cluster failed: %s", err)
}

if meta.connectionInfo.enableTLS {
options.TLSConfig = &tls.Config{
InsecureSkipVerify: meta.connectionInfo.enableTLS,
closeFn := func() error {
if err := client.Close(); err != nil {
redisLog.Error(err, "error closing redis client")
return err
}
return nil
}

listLengthFn := func() (int64, error) {
cmd := client.Eval(script, []string{meta.listName})
if cmd.Err() != nil {
return -1, cmd.Err()
}

return cmd.Int64()
}

return &redisScaler{
metadata: meta,
closeFn: closeFn,
getListLengthFn: listLengthFn,
}, nil
}

func createRedisScaler(meta *redisMetadata, script string) (Scaler, error) {
client, err := getRedisClient(meta.connectionInfo, meta.databaseIndex)
if err != nil {
return nil, fmt.Errorf("connection to redis failed: %s", err)
}

closeFn := func() error {
if err := client.Close(); err != nil {
redisLog.Error(err, "error closing redis client")
return err
}
return nil
}

listLengthFn := func() (int64, error) {
cmd := client.Eval(script, []string{meta.listName})
if cmd.Err() != nil {
return -1, cmd.Err()
}

return cmd.Int64()
}

return &redisScaler{
metadata: meta,
client: redis.NewClient(options),
metadata: meta,
closeFn: closeFn,
getListLengthFn: listLengthFn,
}, nil
}

func parseRedisMetadata(config *ScalerConfig) (*redisMetadata, error) {
connInfo, err := parseRedisAddress(config.TriggerMetadata, config.ResolvedEnv, config.AuthParams)
func parseRedisMetadata(config *ScalerConfig, parserFn redisAddressParser) (*redisMetadata, error) {
connInfo, err := parserFn(config.TriggerMetadata, config.ResolvedEnv, config.AuthParams)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -107,7 +176,7 @@ func parseRedisMetadata(config *ScalerConfig) (*redisMetadata, error) {

// IsActive checks if there is any element in the Redis list
func (s *redisScaler) IsActive(ctx context.Context) (bool, error) {
length, err := getRedisListLength(s.client, s.metadata.listName)
length, err := s.getListLengthFn()

if err != nil {
redisLog.Error(err, "error")
Expand All @@ -118,15 +187,7 @@ func (s *redisScaler) IsActive(ctx context.Context) (bool, error) {
}

func (s *redisScaler) Close() error {
if s.client != nil {
err := s.client.Close()
if err != nil {
redisLog.Error(err, "error closing redis client")
return err
}
}

return nil
return s.closeFn()
}

// GetMetricSpecForScaling returns the metric spec for the HPA
Expand All @@ -149,7 +210,7 @@ func (s *redisScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {

// GetMetrics connects to Redis and finds the length of the list
func (s *redisScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
listLen, err := getRedisListLength(s.client, s.metadata.listName)
listLen, err := s.getListLengthFn()

if err != nil {
redisLog.Error(err, "error getting list length")
Expand All @@ -165,63 +226,40 @@ func (s *redisScaler) GetMetrics(ctx context.Context, metricName string, metricS
return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

func getRedisListLength(client *redis.Client, listName string) (int64, error) {
luaScript := `
local listName = KEYS[1]
local listType = redis.call('type', listName).ok
local cmd = {
zset = 'zcard',
set = 'scard',
list = 'llen',
hash = 'hlen',
none = 'llen'
}
return redis.call(cmd[listType], listName)
`

cmd := client.Eval(luaScript, []string{listName})
if cmd.Err() != nil {
return -1, cmd.Err()
}

return cmd.Int64()
}

func parseRedisAddress(metadata, resolvedEnv, authParams map[string]string) (redisConnectionInfo, error) {
info := redisConnectionInfo{}
switch {
case authParams["address"] != "":
info.address = authParams["address"]
info.addresses = append(info.addresses, authParams["address"])
case metadata["address"] != "":
info.address = metadata["address"]
info.addresses = append(info.addresses, metadata["address"])
case metadata["addressFromEnv"] != "":
info.address = resolvedEnv[metadata["addressFromEnv"]]
info.addresses = append(info.addresses, resolvedEnv[metadata["addressFromEnv"]])
default:
switch {
case authParams["host"] != "":
info.host = authParams["host"]
info.hosts = append(info.hosts, authParams["host"])
case metadata["host"] != "":
info.host = metadata["host"]
info.hosts = append(info.hosts, metadata["host"])
case metadata["hostFromEnv"] != "":
info.host = resolvedEnv[metadata["hostFromEnv"]]
info.hosts = append(info.hosts, resolvedEnv[metadata["hostFromEnv"]])
}

switch {
case authParams["port"] != "":
info.port = authParams["port"]
info.ports = append(info.ports, authParams["port"])
case metadata["port"] != "":
info.port = metadata["port"]
info.ports = append(info.ports, metadata["port"])
case metadata["portFromEnv"] != "":
info.port = resolvedEnv[metadata["portFromEnv"]]
info.ports = append(info.ports, resolvedEnv[metadata["portFromEnv"]])
}

if len(info.host) != 0 && len(info.port) != 0 {
info.address = fmt.Sprintf("%s:%s", info.host, info.port)
if len(info.hosts) != 0 && len(info.ports) != 0 {
info.addresses = append(info.addresses, fmt.Sprintf("%s:%s", info.hosts[0], info.ports[0]))
}
}

if len(info.address) == 0 {
if len(info.addresses) == 0 || len(info.addresses[0]) == 0 {
return info, fmt.Errorf("no address or host given. address should be in the format of host:port or you should set the host/port values")
}

Expand All @@ -242,3 +280,113 @@ func parseRedisAddress(metadata, resolvedEnv, authParams map[string]string) (red

return info, nil
}

func parseRedisClusterAddress(metadata, resolvedEnv, authParams map[string]string) (redisConnectionInfo, error) {
info := redisConnectionInfo{}
switch {
case authParams["addresses"] != "":
info.addresses = splitAndTrim(authParams["addresses"])
case metadata["addresses"] != "":
info.addresses = splitAndTrim(metadata["addresses"])
case metadata["addressesFromEnv"] != "":
info.addresses = splitAndTrim(resolvedEnv[metadata["addressesFromEnv"]])
default:
switch {
case authParams["hosts"] != "":
info.hosts = splitAndTrim(authParams["hosts"])
case metadata["hosts"] != "":
info.hosts = splitAndTrim(metadata["hosts"])
case metadata["hostsFromEnv"] != "":
info.hosts = splitAndTrim(resolvedEnv[metadata["hostsFromEnv"]])
}

switch {
case authParams["ports"] != "":
info.ports = splitAndTrim(authParams["ports"])
case metadata["ports"] != "":
info.ports = splitAndTrim(metadata["ports"])
case metadata["portsFromEnv"] != "":
info.ports = splitAndTrim(resolvedEnv[metadata["portsFromEnv"]])
}

if len(info.hosts) != 0 && len(info.ports) != 0 {
if len(info.hosts) != len(info.ports) {
return info, fmt.Errorf("not enough hosts or ports given. number of hosts should be equal to the number of ports")
}
for i := range info.hosts {
info.addresses = append(info.addresses, fmt.Sprintf("%s:%s", info.hosts[i], info.ports[i]))
}
}
}

if len(info.addresses) == 0 {
return info, fmt.Errorf("no addresses or hosts given. address should be a comma separated list of host:port or set the host/port values")
}

if authParams["password"] != "" {
info.password = authParams["password"]
} else if metadata["passwordFromEnv"] != "" {
info.password = resolvedEnv[metadata["passwordFromEnv"]]
}

info.enableTLS = defaultEnableTLS
if val, ok := metadata["enableTLS"]; ok {
tls, err := strconv.ParseBool(val)
if err != nil {
return info, fmt.Errorf("enableTLS parsing error %s", err.Error())
}
info.enableTLS = tls
}

return info, nil
}

func getRedisClusterClient(info redisConnectionInfo) (*redis.ClusterClient, error) {
options := &redis.ClusterOptions{
Addrs: info.addresses,
Password: info.password,
}
if info.enableTLS {
options.TLSConfig = &tls.Config{
InsecureSkipVerify: info.enableTLS,
}
}

// confirm if connected
c := redis.NewClusterClient(options)
err := c.Ping().Err()
if err != nil {
return nil, err
}
return c, nil
}

func getRedisClient(info redisConnectionInfo, dbIndex int) (*redis.Client, error) {
options := &redis.Options{
Addr: info.addresses[0],
Password: info.password,
DB: dbIndex,
}
if info.enableTLS {
options.TLSConfig = &tls.Config{
InsecureSkipVerify: info.enableTLS,
}
}

// confirm if connected
c := redis.NewClient(options)
err := c.Ping().Err()
if err != nil {
return nil, err
}
return c, nil
}

// Splits a string separated by comma and trims space from all the elements.
func splitAndTrim(s string) []string {
x := strings.Split(s, ",")
for i := range x {
x[i] = strings.Trim(x[i], " ")
}
return x
}
Loading

0 comments on commit d3bacdc

Please sign in to comment.