From 1e654a25a9604cf619e1ab0a8a728ce0a0be8297 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Thu, 15 Aug 2024 16:21:12 +0200 Subject: [PATCH] Provision EventPolicies in kafkasink contract --- .../pkg/reconciler/sink/controller.go | 3 + .../pkg/reconciler/sink/controller_test.go | 1 + .../pkg/reconciler/sink/kafka_sink.go | 119 ++++++++++-------- .../pkg/reconciler/sink/kafka_sink_test.go | 13 +- 4 files changed, 77 insertions(+), 59 deletions(-) diff --git a/control-plane/pkg/reconciler/sink/controller.go b/control-plane/pkg/reconciler/sink/controller.go index 43f2d32675..01887f27e1 100644 --- a/control-plane/pkg/reconciler/sink/controller.go +++ b/control-plane/pkg/reconciler/sink/controller.go @@ -19,6 +19,7 @@ package sink import ( "context" "fmt" + eventpolicyinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy" "net" "net/http" @@ -53,6 +54,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf logger := logging.FromContext(ctx) configmapInformer := configmapinformer.Get(ctx) + eventPolicyInformer := eventpolicyinformer.Get(ctx) clientPool := clientpool.Get(ctx) @@ -68,6 +70,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf ReceiverLabel: base.SinkReceiverLabel, }, ConfigMapLister: configmapInformer.Lister(), + EventPolicyLister: eventPolicyInformer.Lister(), GetKafkaClusterAdmin: clientPool.GetClusterAdmin, Env: configs, } diff --git a/control-plane/pkg/reconciler/sink/controller_test.go b/control-plane/pkg/reconciler/sink/controller_test.go index d1caf7ff12..0fe78cde70 100644 --- a/control-plane/pkg/reconciler/sink/controller_test.go +++ b/control-plane/pkg/reconciler/sink/controller_test.go @@ -27,6 +27,7 @@ import ( "github.com/stretchr/testify/assert" reconcilertesting "knative.dev/pkg/reconciler/testing" + _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake" _ "knative.dev/pkg/client/injection/kube/client/fake" _ "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/fake" _ "knative.dev/pkg/client/injection/kube/informers/core/v1/pod/fake" diff --git a/control-plane/pkg/reconciler/sink/kafka_sink.go b/control-plane/pkg/reconciler/sink/kafka_sink.go index bd95c5947f..30a8a7efc2 100644 --- a/control-plane/pkg/reconciler/sink/kafka_sink.go +++ b/control-plane/pkg/reconciler/sink/kafka_sink.go @@ -19,8 +19,11 @@ package sink import ( "context" "fmt" + corev1 "k8s.io/api/core/v1" + eventingv1alpha1listers "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1" "time" + "k8s.io/utils/ptr" "knative.dev/eventing/pkg/auth" "knative.dev/pkg/logging" @@ -66,7 +69,8 @@ type Reconciler struct { Resolver *resolver.URIResolver - ConfigMapLister corelisters.ConfigMapLister + ConfigMapLister corelisters.ConfigMapLister + EventPolicyLister eventingv1alpha1listers.EventPolicyLister // GetKafkaClusterAdmin creates new sarama ClusterAdmin. It's convenient to add this as Reconciler field so that we can // mock the function used during the reconciliation loop. @@ -84,6 +88,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, ks *eventing.KafkaSink) func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink) error { logger := kafkalogging.CreateReconcileMethodLogger(ctx, ks) + features := feature.FromContext(ctx) statusConditionManager := base.StatusConditionManager{ Object: ks, @@ -180,39 +185,20 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink) zap.Any("contract", ct), ) - // Get sink configuration. - sinkConfig := &contract.Resource{ - Uid: string(ks.UID), - Topics: []string{ks.Spec.Topic}, - Ingress: &contract.Ingress{ - Path: receiver.PathFromObject(ks), - ContentMode: coreconfig.ContentModeFromString(*ks.Spec.ContentMode), - EnableAutoCreateEventTypes: feature.FromContext(ctx).IsEnabled(feature.EvenTypeAutoCreate), - }, - BootstrapServers: kafka.BootstrapServersCommaSeparated(ks.Spec.BootstrapServers), - Reference: &contract.Reference{ - Uuid: string(ks.GetUID()), - Namespace: ks.GetNamespace(), - Name: ks.GetName(), - Kind: "KafkaSink", - GroupVersion: eventingv1alpha1.SchemeGroupVersion.String(), - }, - } - if ks.Spec.HasAuthConfig() { - sinkConfig.Auth = &contract.Resource_AuthSecret{ - AuthSecret: &contract.Reference{ - Uuid: string(secret.UID), - Namespace: secret.Namespace, - Name: secret.Name, - Version: secret.ResourceVersion, - }, - } + var audience *string + if features.IsOIDCAuthentication() { + audience = ptr.To(auth.GetAudience(eventing.SchemeGroupVersion.WithKind("KafkaSink"), ks.ObjectMeta)) + logging.FromContext(ctx).Debugw("Setting the kafkasinks audience", zap.String("audience", *audience)) + } else { + logging.FromContext(ctx).Debug("Clearing the kafkasinks audience as OIDC is not enabled") + audience = nil } - if ks.Status.Address != nil && ks.Status.Address.Audience != nil { - sinkConfig.Ingress.Audience = *ks.Status.Address.Audience + // Get sink configuration. + sinkConfig, err := r.getSinkContractResource(ctx, ks, secret, audience) + if err != nil { + return statusConditionManager.FailedToResolveConfig(err) } - statusConditionManager.ConfigResolved() sinkIndex := coreconfig.FindResource(ct, ks.UID) @@ -245,7 +231,6 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink) logger.Debug("Updated receiver pod annotation") - features := feature.FromContext(ctx) var addressableStatus duckv1.AddressStatus if features.IsPermissiveTransportEncryption() { caCerts, err := r.getCaCerts() @@ -253,8 +238,8 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink) return err } - httpAddress := receiver.HTTPAddress(r.IngressHost, nil, ks) - httpsAddress := receiver.HTTPSAddress(r.IngressHost, nil, ks, caCerts) + httpAddress := receiver.HTTPAddress(r.IngressHost, audience, ks) + httpsAddress := receiver.HTTPSAddress(r.IngressHost, audience, ks, caCerts) // Permissive mode: // - status.address http address with path-based routing // - status.addresses: @@ -271,14 +256,14 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink) if err != nil { return err } - httpsAddress := receiver.HTTPSAddress(r.IngressHost, nil, ks, caCerts) + httpsAddress := receiver.HTTPSAddress(r.IngressHost, audience, ks, caCerts) addressableStatus.Address = &httpsAddress addressableStatus.Addresses = []duckv1.Addressable{httpsAddress} } else { // Disabled mode: // Unchange - httpAddress := receiver.HTTPAddress(r.IngressHost, nil, ks) + httpAddress := receiver.HTTPAddress(r.IngressHost, audience, ks) addressableStatus.Address = &httpAddress addressableStatus.Addresses = []duckv1.Addressable{httpAddress} @@ -301,25 +286,6 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink) ks.Status.AddressStatus = addressableStatus - if features.IsOIDCAuthentication() { - audience := auth.GetAudience(eventing.SchemeGroupVersion.WithKind("KafkaSink"), ks.ObjectMeta) - logging.FromContext(ctx).Debugw("Setting the kafkasinks audience", zap.String("audience", audience)) - ks.Status.Address.Audience = &audience - - for i := range ks.Status.Addresses { - ks.Status.Addresses[i].Audience = &audience - } - } else { - logging.FromContext(ctx).Debug("Clearing the kafkasinks audience as OIDC is not enabled") - if ks.Status.Address != nil { - ks.Status.Address.Audience = nil - } - - for i := range ks.Status.Addresses { - ks.Status.Addresses[i].Audience = nil - } - } - ks.GetConditionSet().Manage(ks.GetStatus()).MarkTrue(base.ConditionAddressable) return nil @@ -461,3 +427,46 @@ func (r *Reconciler) setTrustBundles(ct *contract.Contract) error { ct.TrustBundles = tb return nil } + +func (r *Reconciler) getSinkContractResource(ctx context.Context, kafkaSink *eventingv1alpha1.KafkaSink, secret *corev1.Secret, audience *string) (*contract.Resource, error) { + features := feature.FromContext(ctx) + sinkConfig := &contract.Resource{ + Uid: string(kafkaSink.UID), + Topics: []string{kafkaSink.Spec.Topic}, + Ingress: &contract.Ingress{ + Path: receiver.PathFromObject(kafkaSink), + ContentMode: coreconfig.ContentModeFromString(*kafkaSink.Spec.ContentMode), + EnableAutoCreateEventTypes: features.IsEnabled(feature.EvenTypeAutoCreate), + }, + BootstrapServers: kafka.BootstrapServersCommaSeparated(kafkaSink.Spec.BootstrapServers), + Reference: &contract.Reference{ + Uuid: string(kafkaSink.GetUID()), + Namespace: kafkaSink.GetNamespace(), + Name: kafkaSink.GetName(), + Kind: "KafkaSink", + GroupVersion: eventingv1alpha1.SchemeGroupVersion.String(), + }, + } + if kafkaSink.Spec.HasAuthConfig() { + sinkConfig.Auth = &contract.Resource_AuthSecret{ + AuthSecret: &contract.Reference{ + Uuid: string(secret.UID), + Namespace: secret.Namespace, + Name: secret.Name, + Version: secret.ResourceVersion, + }, + } + } + + if audience != nil { + sinkConfig.Ingress.Audience = *audience + } + + eventPolicies, err := coreconfig.EventPoliciesFromAppliedEventPoliciesStatus(kafkaSink.Status.AppliedEventPoliciesStatus, r.EventPolicyLister, kafkaSink.Namespace, features) + if err != nil { + return nil, fmt.Errorf("could not get eventpolicies from kafkasink status: %w", err) + } + sinkConfig.Ingress.EventPolicies = eventPolicies + + return sinkConfig, nil +} diff --git a/control-plane/pkg/reconciler/sink/kafka_sink_test.go b/control-plane/pkg/reconciler/sink/kafka_sink_test.go index 1583b4ea0e..494d38a96d 100644 --- a/control-plane/pkg/reconciler/sink/kafka_sink_test.go +++ b/control-plane/pkg/reconciler/sink/kafka_sink_test.go @@ -1331,9 +1331,13 @@ func sinkReconciliation(t *testing.T, format string, env config.Env) { ConfigMapUpdate(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, env.ContractConfigMapFormat, &contract.Contract{ Resources: []*contract.Resource{ { - Uid: SinkUUID, - Topics: []string{SinkTopic()}, - Ingress: &contract.Ingress{ContentMode: contract.ContentMode_BINARY, Path: receiver.Path(SinkNamespace, SinkName)}, + Uid: SinkUUID, + Topics: []string{SinkTopic()}, + Ingress: &contract.Ingress{ + ContentMode: contract.ContentMode_BINARY, + Path: receiver.Path(SinkNamespace, SinkName), + Audience: sinkAudience, + }, BootstrapServers: bootstrapServers, Reference: SinkReference(), }, @@ -1755,7 +1759,8 @@ func useTable(t *testing.T, table TableTest, env *config.Env) { DataPlaneNamespace: env.SystemNamespace, ReceiverLabel: base.SinkReceiverLabel, }, - ConfigMapLister: listers.GetConfigMapLister(), + ConfigMapLister: listers.GetConfigMapLister(), + EventPolicyLister: listers.GetEventPolicyLister(), GetKafkaClusterAdmin: func(_ context.Context, _ []string, _ *corev1.Secret) (sarama.ClusterAdmin, error) { return &kafkatesting.MockKafkaClusterAdmin{ ExpectedTopicName: expectedTopicName,