Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve Pub/Sub resources from scale target's environment #5701

Merged
merged 3 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ Here is an overview of all new **experimental** features:
- **General**: Add `validations.keda.sh/hpa-ownership` annotation to HPA to disable ownership validation ([#5516](https://github.com/kedacore/keda/issues/5516))
- **General**: Support csv-format for WATCH_NAMESPACE env var ([#5670](https://github.com/kedacore/keda/issues/5670))
- **Azure Event Hub Scaler**: Remove usage of checkpoint offsets to account for SDK checkpointing implementation changes ([#5574](https://github.com/kedacore/keda/issues/5574))
- **GCP Pub/Sub Scaler**: Add support for resolving resource names from the scale target's environment ([#5693](https://github.com/kedacore/keda/issues/5693))
- **GCP Stackdriver Scaler**: Add missing parameters 'rate' and 'count' for GCP Stackdriver Scaler alignment ([#5633](https://github.com/kedacore/keda/issues/5633))
- **Metrics API Scaler**: Add support for various formats: json, xml, yaml, prometheus ([#2633](https://github.com/kedacore/keda/issues/2633))
- **MongoDB Scaler**: Add scheme field support srv record ([#5544](https://github.com/kedacore/keda/issues/5544))
Expand Down
97 changes: 77 additions & 20 deletions pkg/scalers/gcp_pubsub_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,80 @@ func NewPubSubScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
}, nil
}

func parsePubSubResourceConfig(config *scalersconfig.ScalerConfig, meta *pubsubMetadata) error {
sub, subPresent := config.TriggerMetadata["subscriptionName"]
subFromEnv, subFromEnvPresent := config.TriggerMetadata["subscriptionNameFromEnv"]
if subPresent && subFromEnvPresent {
return fmt.Errorf("exactly one of subscriptionName or subscriptionNameFromEnv is allowed")
}
hasSub := subPresent || subFromEnvPresent

topic, topicPresent := config.TriggerMetadata["topicName"]
topicFromEnv, topicFromEnvPresent := config.TriggerMetadata["topicNameFromEnv"]
if topicPresent && topicFromEnvPresent {
return fmt.Errorf("exactly one of topicName or topicNameFromEnv is allowed")
}
hasTopic := topicPresent || topicFromEnvPresent

if (!hasSub && !hasTopic) || (hasSub && hasTopic) {
return fmt.Errorf("exactly one of subscription or topic name must be given")
}

if hasSub {
if subPresent {
if sub == "" {
return fmt.Errorf("no subscription name given")
}

meta.resourceName = sub
} else {
if subFromEnv == "" {
return fmt.Errorf("no environment variable name given for resolving subscription name")
}

resolvedSub, ok := config.ResolvedEnv[subFromEnv]
if !ok {
return fmt.Errorf("resolved environment doesn't contain name '%s'", subFromEnv)
}

if resolvedSub == "" {
return fmt.Errorf("resolved environment subscription name is empty")
}

meta.resourceName = config.ResolvedEnv[subFromEnv]
}

meta.resourceType = resourceTypePubSubSubscription
} else {
if topicPresent {
if topic == "" {
return fmt.Errorf("no topic name given")
}

meta.resourceName = topic
} else {
if topicFromEnv == "" {
return fmt.Errorf("no environment variable name given for resolving topic name")
}

resolvedTopic, ok := config.ResolvedEnv[topicFromEnv]
if !ok {
return fmt.Errorf("resolved environment doesn't contain name '%s'", topicFromEnv)
}

if resolvedTopic == "" {
return fmt.Errorf("resolved environment topic name is empty")
}

meta.resourceName = config.ResolvedEnv[topicFromEnv]
}

meta.resourceType = resourceTypePubSubTopic
}

return nil
}

func parsePubSubMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) (*pubsubMetadata, error) {
meta := pubsubMetadata{mode: pubSubModeSubscriptionSize, value: pubSubDefaultValue}

Expand Down Expand Up @@ -106,26 +180,9 @@ func parsePubSubMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger)

meta.aggregation = config.TriggerMetadata["aggregation"]

sub, subPresent := config.TriggerMetadata["subscriptionName"]
topic, topicPresent := config.TriggerMetadata["topicName"]
if (!subPresent && !topicPresent) || (subPresent && topicPresent) {
return nil, fmt.Errorf("exactly one of subscription or topic name must be given")
}

if subPresent {
if sub == "" {
return nil, fmt.Errorf("no subscription name given")
}

meta.resourceName = sub
meta.resourceType = resourceTypePubSubSubscription
} else {
if topic == "" {
return nil, fmt.Errorf("no topic name given")
}

meta.resourceName = topic
meta.resourceType = resourceTypePubSubTopic
err := parsePubSubResourceConfig(config, &meta)
if err != nil {
return nil, err
}

meta.activationValue = 0
Expand Down
18 changes: 15 additions & 3 deletions pkg/scalers/gcp_pubsub_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
)

var testPubSubResolvedEnv = map[string]string{
"SAMPLE_CREDS": "{}",
"SAMPLE_CREDS": "{}",
"MY_ENV_SUBSCRIPTION": "myEnvSubscription",
"MY_ENV_TOPIC": "myEnvTopic",
}

type parsePubSubMetadataTestData struct {
Expand Down Expand Up @@ -76,6 +78,14 @@ var testPubSubMetadata = []parsePubSubMetadataTestData{
{nil, map[string]string{"value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// both subscriptionSize and topicName present
{nil, map[string]string{"subscriptionSize": "7", "topicName": "mytopic", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// both subscriptionName and subscriptionNameFromEnv present
{nil, map[string]string{"subscriptionName": "mysubscription", "subscriptionNameFromEnv": "MY_ENV_SUBSCRIPTION", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// both topicName and topicNameFromEnv present
{nil, map[string]string{"topicName": "mytopic", "topicNameFromEnv": "MY_ENV_TOPIC", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, true},
// subscriptionNameFromEnv present
{nil, map[string]string{"subscriptionNameFromEnv": "MY_ENV_SUBSCRIPTION", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false},
// topicNameFromEnv present
{nil, map[string]string{"topicNameFromEnv": "MY_ENV_TOPIC", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false},
}

var gcpPubSubMetricIdentifiers = []gcpPubSubMetricIdentifier{
Expand All @@ -90,6 +100,8 @@ var gcpResourceNameTests = []gcpPubSubSubscription{
{&testPubSubMetadata[12], 1, "projects/myproject/mysubscription", ""},
{&testPubSubMetadata[17], 1, "mytopic", "myproject"},
{&testPubSubMetadata[18], 1, "projects/myproject/mytopic", ""},
{&testPubSubMetadata[24], 1, "myEnvSubscription", ""},
{&testPubSubMetadata[25], 1, "myEnvTopic", ""},
}

var gcpSubscriptionDefaults = []gcpPubSubSubscription{
Expand Down Expand Up @@ -140,7 +152,7 @@ func TestGcpPubSubGetMetricSpecForScaling(t *testing.T) {
}
}

func TestGcpPubSubSubscriptionName(t *testing.T) {
func TestGcpPubSubResourceName(t *testing.T) {
for _, testData := range gcpResourceNameTests {
meta, err := parsePubSubMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testPubSubResolvedEnv, TriggerIndex: testData.triggerIndex}, logr.Discard())
if err != nil {
Expand All @@ -150,7 +162,7 @@ func TestGcpPubSubSubscriptionName(t *testing.T) {
resourceID, projectID := getResourceData(&mockGcpPubSubScaler)

if resourceID != testData.name || projectID != testData.projectID {
t.Error("Wrong Subscription parsing:", resourceID, projectID)
t.Error("Wrong Resource parsing:", resourceID, projectID)
}
}
}
Expand Down
Loading