Skip to content

Commit

Permalink
fix(kafka): Make parsing partitionLimitation more resilient against ws (
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreasBergmeier6176 authored and xoanmm committed Mar 22, 2023
1 parent e17ac4a commit d64c57e
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 8 deletions.
3 changes: 2 additions & 1 deletion pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ func parseKafkaMetadata(config *ScalerConfig, logger logr.Logger) (kafkaMetadata
}

meta.partitionLimitation = nil
if config.TriggerMetadata["partitionLimitation"] != "" {
partitionLimitationMetadata := strings.TrimSpace(config.TriggerMetadata["partitionLimitation"])
if partitionLimitationMetadata != "" {
if meta.topic == "" {
logger.V(1).Info("no specific topic set, ignoring partitionLimitation setting")
} else {
Expand Down
19 changes: 12 additions & 7 deletions pkg/scalers/kafka_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ var parseKafkaMetadataTestDataset = []parseKafkaMetadataTestData{
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group"}, false, 1, []string{"foobar:9092"}, "my-group", "", nil, offsetResetPolicy("latest"), false, false},
// success, ignore partitionLimitation if no topic
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": "1,2,3,4,5,6"}, false, 1, []string{"foobar:9092"}, "my-group", "", nil, offsetResetPolicy("latest"), false, false},
// success, no limitation with whitespaced limitation value
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": " "}, false, 1, []string{"foobar:9092"}, "my-group", "", nil, offsetResetPolicy("latest"), false, false},
// success, no limitation
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": ""}, false, 1, []string{"foobar:9092"}, "my-group", "", nil, offsetResetPolicy("latest"), false, false},
// failure, version not supported
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "version": "1.2.3.4"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false},
// failure, lagThreshold is negative value
Expand Down Expand Up @@ -169,8 +173,8 @@ var parseKafkaOAuthbreakerAuthParamsTestDataset = []parseKafkaAuthParamsTestData
}

var kafkaMetricIdentifiers = []kafkaMetricIdentifier{
{&parseKafkaMetadataTestDataset[8], 0, "s0-kafka-my-topic"},
{&parseKafkaMetadataTestDataset[8], 1, "s1-kafka-my-topic"},
{&parseKafkaMetadataTestDataset[10], 0, "s0-kafka-my-topic"},
{&parseKafkaMetadataTestDataset[10], 1, "s1-kafka-my-topic"},
{&parseKafkaMetadataTestDataset[2], 1, "s1-kafka-my-group-topics"},
}

Expand Down Expand Up @@ -309,8 +313,9 @@ func TestGetTopicPartitions(t *testing.T) {
partitionIds []int32
exp map[string][]int32
}{
{"success_all_partitions", map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "partitionLimitation": "1,2"}, []int32{1, 2}, map[string][]int32{"my-topic": {1, 2}}},
{"success_partial_partitions", map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "partitionLimitation": "1,2,3"}, []int32{1, 2, 3, 4, 5, 6}, map[string][]int32{"my-topic": {1, 2, 3}}},
{"success_all_partitions_explicit", map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "partitionLimitation": "1,2"}, []int32{1, 2}, map[string][]int32{"my-topic": {1, 2}}},
{"success_partial_partitions_explicit", map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "partitionLimitation": "1,2,3"}, []int32{1, 2, 3, 4, 5, 6}, map[string][]int32{"my-topic": {1, 2, 3}}},
{"success_all_partitions_implicit", map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "partitionLimitation": ""}, []int32{1, 2, 3, 4, 5, 6}, map[string][]int32{"my-topic": {1, 2, 3, 4, 5, 6}}},
}

for _, tt := range testData {
Expand All @@ -321,10 +326,10 @@ func TestGetTopicPartitions(t *testing.T) {
}
mockKafkaScaler := kafkaScaler{"", meta, nil, &MockClusterAdmin{partitionIds: tt.partitionIds}, logr.Discard(), make(map[string]map[int32]int64)}

patitions, err := mockKafkaScaler.getTopicPartitions()
partitions, err := mockKafkaScaler.getTopicPartitions()

if !reflect.DeepEqual(tt.exp, patitions) {
t.Errorf("Expected %v but got %v\n", tt.exp, patitions)
if !reflect.DeepEqual(tt.exp, partitions) {
t.Errorf("Expected %v but got %v\n", tt.exp, partitions)
}

if err != nil {
Expand Down

0 comments on commit d64c57e

Please sign in to comment.