Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: verify that scaledobjects have no authenticationref when it is empty #3944

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions test/e2e_new/kafka_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,22 @@ func TestKafkaSourceKedaScaling(t *testing.T) {

}

func TestKafkaSourceScaledObject(t *testing.T) {
t.Parallel()

ctx, env := global.Environment(
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
environment.WithPollTimings(5*time.Second, 4*time.Minute),
environment.Managed(t),
)

env.Test(ctx, t, features.KafkaSourceScaledObjectHasNoEmptyAuthRef())

}

func TestKafkaSourceTLSSink(t *testing.T) {

t.Parallel()
Expand Down
72 changes: 72 additions & 0 deletions test/rekt/features/keda_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"

"knative.dev/eventing-kafka-broker/control-plane/pkg/autoscaler/keda"
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka"

"knative.dev/eventing/test/rekt/resources/trigger"
Expand All @@ -39,6 +40,7 @@ import (
"knative.dev/eventing-kafka-broker/test/rekt/resources/kafkasink"
"knative.dev/eventing-kafka-broker/test/rekt/resources/kafkasource"
"knative.dev/eventing-kafka-broker/test/rekt/resources/kafkatopic"
kedaclient "knative.dev/eventing-kafka-broker/third_party/pkg/client/injection/client"
eventingclient "knative.dev/eventing/pkg/client/injection/client"
"knative.dev/eventing/test/rekt/resources/broker"
subscriptionresources "knative.dev/eventing/test/rekt/resources/subscription"
Expand All @@ -51,6 +53,44 @@ import (
"knative.dev/reconciler-test/pkg/resources/service"
)

func KafkaSourceScaledObjectHasNoEmptyAuthRef() *feature.Feature {
f := feature.NewFeatureNamed("KafkaSourceScalesToZeroWithKeda")

// we need to ensure that autoscaling is enabled for the rest of the feature to work
f.Prerequisite("Autoscaling is enabled", kafkafeatureflags.AutoscalingEnabled())

kafkaSource := feature.MakeRandomK8sName("kafka-source")
topic := feature.MakeRandomK8sName("topic")
kafkaSink := feature.MakeRandomK8sName("kafkaSink")
receiver := feature.MakeRandomK8sName("eventshub-receiver")

event := cetest.FullEvent()
event.SetID(uuid.New().String())

f.Setup("install kafka topic", kafkatopic.Install(topic))
f.Setup("topic is ready", kafkatopic.IsReady(topic))

// Binary content mode is default for Kafka Sink.
f.Setup("install kafkasink", kafkasink.Install(kafkaSink, topic, testpkg.BootstrapServersPlaintextArr))
f.Setup("kafkasink is ready", kafkasink.IsReady(kafkaSink))

f.Setup("install eventshub receiver", eventshub.Install(receiver, eventshub.StartReceiver))

kafkaSourceOpts := []manifest.CfgFn{
kafkasource.WithSink(service.AsDestinationRef(receiver)),
kafkasource.WithTopics([]string{topic}),
kafkasource.WithBootstrapServers(testingpkg.BootstrapServersPlaintextArr),
}

f.Setup("install kafka source", kafkasource.Install(kafkaSource, kafkaSourceOpts...))
f.Setup("kafka source is ready", kafkasource.IsReady(kafkaSource))

// after the event is sent, the source should scale down to zero replicas
f.Alpha("kafka source consumergroup scaled object").MustNot("have an authentication ref set on the trigger", verifyScaledObjectTriggerRef(getKafkaSourceCg(kafkaSource)))

return f
}

func KafkaSourceScalesToZeroWithKeda() *feature.Feature {
f := feature.NewFeatureNamed("KafkaSourceScalesToZeroWithKeda")

Expand Down Expand Up @@ -265,3 +305,35 @@ func verifyConsumerGroupReplicas(getConsumerGroupName getCgName, expectedReplica
}
}
}

func verifyScaledObjectTriggerRef(getConsumerGroupName getCgName) feature.StepFn {
return func(ctx context.Context, t feature.T) {
kedaClient := kedaclient.Get(ctx)
internalsClient := consumergroupclient.Get(ctx)
ns := environment.FromContext(ctx).Namespace()

cgName, err := getConsumerGroupName(ctx)
if err != nil {
t.Fatal(err)
}

cg, err := internalsClient.InternalV1alpha1().ConsumerGroups(ns).Get(ctx, cgName, metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}

so, err := kedaClient.KedaV1alpha1().ScaledObjects(ns).Get(ctx, keda.GenerateScaledObjectName(cg), metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}

if so.Spec.Triggers != nil {
for _, trig := range so.Spec.Triggers {
if trig.AuthenticationRef != nil {
t.Fatal("trigger on scaled object should have no authentication ref but there is an authentication ref")
}
}
}

}
}
Loading