Skip to content

Commit

Permalink
[release-v1.14] fix: only add triggerAuth to scaled objects when ther…
Browse files Browse the repository at this point in the history
…e is an actual value to reference (knative-extensions#3941) (#1119)

* fix: only add triggerAuth to scaled objects when there is an actual value to reference (knative-extensions#3941)

Signed-off-by: Calum Murray <cmurray@redhat.com>

* test: verify that scaledobjects have no authenticationref when it is empty (knative-extensions#3944)

Signed-off-by: Calum Murray <cmurray@redhat.com>

---------

Signed-off-by: Calum Murray <cmurray@redhat.com>
  • Loading branch information
Cali0707 authored Jul 8, 2024
1 parent 4da803e commit 8cbbb1a
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 4 deletions.
9 changes: 5 additions & 4 deletions control-plane/pkg/autoscaler/keda/keda.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,14 @@ func GenerateScaleTriggers(cg *kafkainternals.ConsumerGroup, triggerAuthenticati
}

trigger := kedav1alpha1.ScaleTriggers{
Type: "kafka",
Metadata: triggerMetadata,
AuthenticationRef: &kedav1alpha1.ScaledObjectAuthRef{},
Type: "kafka",
Metadata: triggerMetadata,
}

if triggerAuthentication != nil {
trigger.AuthenticationRef.Name = triggerAuthentication.Name
trigger.AuthenticationRef = &kedav1alpha1.ScaledObjectAuthRef{
Name: triggerAuthentication.Name,
}
}

triggers = append(triggers, trigger)
Expand Down
16 changes: 16 additions & 0 deletions test/e2e_new/kafka_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,22 @@ func TestKafkaSourceKedaScaling(t *testing.T) {

}

func TestKafkaSourceScaledObject(t *testing.T) {
t.Parallel()

ctx, env := global.Environment(
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
environment.WithPollTimings(5*time.Second, 4*time.Minute),
environment.Managed(t),
)

env.Test(ctx, t, features.KafkaSourceScaledObjectHasNoEmptyAuthRef())

}

func TestKafkaSourceTLSSink(t *testing.T) {

t.Parallel()
Expand Down
72 changes: 72 additions & 0 deletions test/rekt/features/keda_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"

"knative.dev/eventing-kafka-broker/control-plane/pkg/autoscaler/keda"
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka"

"knative.dev/eventing/test/rekt/resources/trigger"
Expand All @@ -38,6 +39,7 @@ import (
"knative.dev/eventing-kafka-broker/test/rekt/resources/kafkasink"
"knative.dev/eventing-kafka-broker/test/rekt/resources/kafkasource"
"knative.dev/eventing-kafka-broker/test/rekt/resources/kafkatopic"
kedaclient "knative.dev/eventing-kafka-broker/third_party/pkg/client/injection/client"
eventingclient "knative.dev/eventing/pkg/client/injection/client"
"knative.dev/eventing/test/rekt/resources/broker"
"knative.dev/reconciler-test/pkg/environment"
Expand All @@ -48,6 +50,44 @@ import (
"knative.dev/reconciler-test/pkg/resources/service"
)

func KafkaSourceScaledObjectHasNoEmptyAuthRef() *feature.Feature {
f := feature.NewFeatureNamed("KafkaSourceScalesToZeroWithKeda")

// we need to ensure that autoscaling is enabled for the rest of the feature to work
f.Prerequisite("Autoscaling is enabled", kafkafeatureflags.AutoscalingEnabled())

kafkaSource := feature.MakeRandomK8sName("kafka-source")
topic := feature.MakeRandomK8sName("topic")
kafkaSink := feature.MakeRandomK8sName("kafkaSink")
receiver := feature.MakeRandomK8sName("eventshub-receiver")

event := cetest.FullEvent()
event.SetID(uuid.New().String())

f.Setup("install kafka topic", kafkatopic.Install(topic))
f.Setup("topic is ready", kafkatopic.IsReady(topic))

// Binary content mode is default for Kafka Sink.
f.Setup("install kafkasink", kafkasink.Install(kafkaSink, topic, testpkg.BootstrapServersPlaintextArr))
f.Setup("kafkasink is ready", kafkasink.IsReady(kafkaSink))

f.Setup("install eventshub receiver", eventshub.Install(receiver, eventshub.StartReceiver))

kafkaSourceOpts := []manifest.CfgFn{
kafkasource.WithSink(service.AsDestinationRef(receiver)),
kafkasource.WithTopics([]string{topic}),
kafkasource.WithBootstrapServers(testingpkg.BootstrapServersPlaintextArr),
}

f.Setup("install kafka source", kafkasource.Install(kafkaSource, kafkaSourceOpts...))
f.Setup("kafka source is ready", kafkasource.IsReady(kafkaSource))

// after the event is sent, the source should scale down to zero replicas
f.Alpha("kafka source consumergroup scaled object").MustNot("have an authentication ref set on the trigger", verifyScaledObjectTriggerRef(getKafkaSourceCg(kafkaSource)))

return f
}

func KafkaSourceScalesToZeroWithKeda() *feature.Feature {
f := feature.NewFeatureNamed("KafkaSourceScalesToZeroWithKeda")

Expand Down Expand Up @@ -205,3 +245,35 @@ func verifyConsumerGroupReplicas(getConsumerGroupName getCgName, expectedReplica
}
}
}

func verifyScaledObjectTriggerRef(getConsumerGroupName getCgName) feature.StepFn {
return func(ctx context.Context, t feature.T) {
kedaClient := kedaclient.Get(ctx)
internalsClient := consumergroupclient.Get(ctx)
ns := environment.FromContext(ctx).Namespace()

cgName, err := getConsumerGroupName(ctx)
if err != nil {
t.Fatal(err)
}

cg, err := internalsClient.InternalV1alpha1().ConsumerGroups(ns).Get(ctx, cgName, metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}

so, err := kedaClient.KedaV1alpha1().ScaledObjects(ns).Get(ctx, keda.GenerateScaledObjectName(cg), metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}

if so.Spec.Triggers != nil {
for _, trig := range so.Spec.Triggers {
if trig.AuthenticationRef != nil {
t.Fatal("trigger on scaled object should have no authentication ref but there is an authentication ref")
}
}
}

}
}

0 comments on commit 8cbbb1a

Please sign in to comment.