From d64c57e1f155c6d2c43e7cd4f225ef65a66c3c5e Mon Sep 17 00:00:00 2001 From: Andreas Bergmeier <51448674+AndreasBergmeier6176@users.noreply.github.com> Date: Wed, 8 Mar 2023 13:04:03 +0100 Subject: [PATCH] fix(kafka): Make parsing partitionLimitation more resilient against ws (#4333) --- pkg/scalers/kafka_scaler.go | 3 ++- pkg/scalers/kafka_scaler_test.go | 19 ++++++++++++------- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index 15287a6088c..3e3ff1174c1 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -212,7 +212,8 @@ func parseKafkaMetadata(config *ScalerConfig, logger logr.Logger) (kafkaMetadata } meta.partitionLimitation = nil - if config.TriggerMetadata["partitionLimitation"] != "" { + partitionLimitationMetadata := strings.TrimSpace(config.TriggerMetadata["partitionLimitation"]) + if partitionLimitationMetadata != "" { if meta.topic == "" { logger.V(1).Info("no specific topic set, ignoring partitionLimitation setting") } else { diff --git a/pkg/scalers/kafka_scaler_test.go b/pkg/scalers/kafka_scaler_test.go index 23a64bb2d2e..7a1df4b7d62 100644 --- a/pkg/scalers/kafka_scaler_test.go +++ b/pkg/scalers/kafka_scaler_test.go @@ -62,6 +62,10 @@ var parseKafkaMetadataTestDataset = []parseKafkaMetadataTestData{ {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group"}, false, 1, []string{"foobar:9092"}, "my-group", "", nil, offsetResetPolicy("latest"), false, false}, // success, ignore partitionLimitation if no topic {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": "1,2,3,4,5,6"}, false, 1, []string{"foobar:9092"}, "my-group", "", nil, offsetResetPolicy("latest"), false, false}, + // success, no limitation with whitespaced limitation value + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": " "}, false, 1, []string{"foobar:9092"}, "my-group", "", nil, offsetResetPolicy("latest"), false, false}, + // success, no limitation + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": ""}, false, 1, []string{"foobar:9092"}, "my-group", "", nil, offsetResetPolicy("latest"), false, false}, // failure, version not supported {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "version": "1.2.3.4"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false}, // failure, lagThreshold is negative value @@ -169,8 +173,8 @@ var parseKafkaOAuthbreakerAuthParamsTestDataset = []parseKafkaAuthParamsTestData } var kafkaMetricIdentifiers = []kafkaMetricIdentifier{ - {&parseKafkaMetadataTestDataset[8], 0, "s0-kafka-my-topic"}, - {&parseKafkaMetadataTestDataset[8], 1, "s1-kafka-my-topic"}, + {&parseKafkaMetadataTestDataset[10], 0, "s0-kafka-my-topic"}, + {&parseKafkaMetadataTestDataset[10], 1, "s1-kafka-my-topic"}, {&parseKafkaMetadataTestDataset[2], 1, "s1-kafka-my-group-topics"}, } @@ -309,8 +313,9 @@ func TestGetTopicPartitions(t *testing.T) { partitionIds []int32 exp map[string][]int32 }{ - {"success_all_partitions", map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "partitionLimitation": "1,2"}, []int32{1, 2}, map[string][]int32{"my-topic": {1, 2}}}, - {"success_partial_partitions", map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "partitionLimitation": "1,2,3"}, []int32{1, 2, 3, 4, 5, 6}, map[string][]int32{"my-topic": {1, 2, 3}}}, + {"success_all_partitions_explicit", map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "partitionLimitation": "1,2"}, []int32{1, 2}, map[string][]int32{"my-topic": {1, 2}}}, + {"success_partial_partitions_explicit", map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "partitionLimitation": "1,2,3"}, []int32{1, 2, 3, 4, 5, 6}, map[string][]int32{"my-topic": {1, 2, 3}}}, + {"success_all_partitions_implicit", map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "partitionLimitation": ""}, []int32{1, 2, 3, 4, 5, 6}, map[string][]int32{"my-topic": {1, 2, 3, 4, 5, 6}}}, } for _, tt := range testData { @@ -321,10 +326,10 @@ func TestGetTopicPartitions(t *testing.T) { } mockKafkaScaler := kafkaScaler{"", meta, nil, &MockClusterAdmin{partitionIds: tt.partitionIds}, logr.Discard(), make(map[string]map[int32]int64)} - patitions, err := mockKafkaScaler.getTopicPartitions() + partitions, err := mockKafkaScaler.getTopicPartitions() - if !reflect.DeepEqual(tt.exp, patitions) { - t.Errorf("Expected %v but got %v\n", tt.exp, patitions) + if !reflect.DeepEqual(tt.exp, partitions) { + t.Errorf("Expected %v but got %v\n", tt.exp, partitions) } if err != nil {