diff --git a/control-plane/pkg/reconciler/channel/channel.go b/control-plane/pkg/reconciler/channel/channel.go index 35af8b9c28..27d2015059 100644 --- a/control-plane/pkg/reconciler/channel/channel.go +++ b/control-plane/pkg/reconciler/channel/channel.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + eventingv1alpha1listers "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1" "strconv" "strings" "time" @@ -109,6 +110,7 @@ type Reconciler struct { ConfigMapLister corelisters.ConfigMapLister ServiceLister corelisters.ServiceLister SubscriptionLister messaginglisters.SubscriptionLister + EventPolicyLister eventingv1alpha1listers.EventPolicyLister Prober prober.NewProber @@ -121,6 +123,7 @@ type Reconciler struct { func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta1.KafkaChannel) reconciler.Event { logger := kafkalogging.CreateReconcileMethodLogger(ctx, channel) + featureFlags := feature.FromContext(ctx) statusConditionManager := base.StatusConditionManager{ Object: channel, @@ -219,8 +222,17 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta return statusConditionManager.FailedToResolveConfig(err) } + var audience *string + if featureFlags.IsOIDCAuthentication() { + audience = pointer.String(auth.GetAudience(messaging.SchemeGroupVersion.WithKind("KafkaChannel"), channel.ObjectMeta)) + logging.FromContext(ctx).Debugw("Setting the KafkaChannels audience", zap.String("audience", *audience)) + } else { + logging.FromContext(ctx).Debug("Clearing the KafkaChannels audience as OIDC is not enabled") + audience = nil + } + // Get resource configuration - channelResource, err := r.getChannelContractResource(ctx, topic, channel, authContext, topicConfig) + channelResource, err := r.getChannelContractResource(ctx, topic, channel, authContext, topicConfig, audience) if err != nil { return statusConditionManager.FailedToResolveConfig(err) } @@ -275,16 +287,6 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta return err } - featureFlags := feature.FromContext(ctx) - var audience *string - if featureFlags.IsOIDCAuthentication() { - audience = pointer.String(auth.GetAudience(messaging.SchemeGroupVersion.WithKind("KafkaChannel"), channel.ObjectMeta)) - logging.FromContext(ctx).Debugw("Setting the KafkaChannels audience", zap.String("audience", *audience)) - } else { - logging.FromContext(ctx).Debug("Clearing the KafkaChannels audience as OIDC is not enabled") - audience = nil - } - var addressableStatus duckv1.AddressStatus channelHttpsHost := network.GetServiceHostname(r.Env.IngressName, r.SystemNamespace) channelHttpHost := network.GetServiceHostname(channelService.Name, channel.Namespace) @@ -685,13 +687,15 @@ func (r *Reconciler) reconcileConsumerGroup(ctx context.Context, channel *messag return cg, nil } -func (r *Reconciler) getChannelContractResource(ctx context.Context, topic string, channel *messagingv1beta1.KafkaChannel, auth *security.NetSpecAuthContext, config *kafka.TopicConfig) (*contract.Resource, error) { +func (r *Reconciler) getChannelContractResource(ctx context.Context, topic string, channel *messagingv1beta1.KafkaChannel, auth *security.NetSpecAuthContext, config *kafka.TopicConfig, audience *string) (*contract.Resource, error) { + features := feature.FromContext(ctx) + resource := &contract.Resource{ Uid: string(channel.UID), Topics: []string{topic}, Ingress: &contract.Ingress{ Host: receiver.Host(channel.GetNamespace(), channel.GetName()), - EnableAutoCreateEventTypes: feature.FromContext(ctx).IsEnabled(feature.EvenTypeAutoCreate), + EnableAutoCreateEventTypes: features.IsEnabled(feature.EvenTypeAutoCreate), Path: receiver.Path(channel.GetNamespace(), channel.GetName()), }, BootstrapServers: config.GetBootstrapServers(), @@ -710,9 +714,15 @@ func (r *Reconciler) getChannelContractResource(ctx context.Context, topic strin } } - if channel.Status.Address != nil && channel.Status.Address.Audience != nil { - resource.Ingress.Audience = *channel.Status.Address.Audience + if audience != nil { + resource.Ingress.Audience = *audience + } + + eventPolicies, err := coreconfig.EventPoliciesFromAppliedEventPoliciesStatus(channel.Status.AppliedEventPoliciesStatus, r.EventPolicyLister, channel.Namespace, features) + if err != nil { + return nil, fmt.Errorf("could not get eventpolicies from channel status: %w", err) } + resource.Ingress.EventPolicies = eventPolicies egressConfig, err := coreconfig.EgressConfigFromDelivery(ctx, r.Resolver, channel, channel.Spec.Delivery, r.DefaultBackoffDelayMs) if err != nil { diff --git a/control-plane/pkg/reconciler/channel/channel_test.go b/control-plane/pkg/reconciler/channel/channel_test.go index 08a07cdfef..e5e7725134 100644 --- a/control-plane/pkg/reconciler/channel/channel_test.go +++ b/control-plane/pkg/reconciler/channel/channel_test.go @@ -1964,8 +1964,9 @@ func TestReconcileKind(t *testing.T) { BootstrapServers: ChannelBootstrapServers, Reference: ChannelReference(), Ingress: &contract.Ingress{ - Host: receiver.Host(ChannelNamespace, ChannelName), - Path: receiver.Path(ChannelNamespace, ChannelName), + Host: receiver.Host(ChannelNamespace, ChannelName), + Path: receiver.Path(ChannelNamespace, ChannelName), + Audience: ChannelAudience, }, }, }, @@ -2085,6 +2086,7 @@ func TestReconcileKind(t *testing.T) { ServiceLister: listers.GetServiceLister(), SubscriptionLister: listers.GetSubscriptionLister(), ConsumerGroupLister: listers.GetConsumerGroupLister(), + EventPolicyLister: listers.GetEventPolicyLister(), InternalsClient: fakeconsumergroupinformer.Get(ctx), Prober: proberMock, IngressHost: network.GetServiceHostname(env.IngressName, env.SystemNamespace), diff --git a/control-plane/pkg/reconciler/channel/controller.go b/control-plane/pkg/reconciler/channel/controller.go index 7e944337a6..9fba4f5264 100644 --- a/control-plane/pkg/reconciler/channel/controller.go +++ b/control-plane/pkg/reconciler/channel/controller.go @@ -19,6 +19,7 @@ package channel import ( "context" "fmt" + eventpolicyinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy" "net" "net/http" @@ -63,6 +64,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf configmapInformer := configmapinformer.Get(ctx) channelInformer := kafkachannelinformer.Get(ctx) consumerGroupInformer := consumergroupinformer.Get(ctx) + eventPolicyInformer := eventpolicyinformer.Get(ctx) messagingv1beta.RegisterAlternateKafkaChannelConditionSet(conditionSet) @@ -85,6 +87,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf ServiceLister: serviceinformer.Get(ctx).Lister(), SubscriptionLister: subscriptioninformer.Get(ctx).Lister(), ConsumerGroupLister: consumerGroupInformer.Lister(), + EventPolicyLister: eventPolicyInformer.Lister(), InternalsClient: consumergroupclient.Get(ctx), KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(), } diff --git a/control-plane/pkg/reconciler/channel/controller_test.go b/control-plane/pkg/reconciler/channel/controller_test.go index a395e47ba0..e2a28f1243 100644 --- a/control-plane/pkg/reconciler/channel/controller_test.go +++ b/control-plane/pkg/reconciler/channel/controller_test.go @@ -28,6 +28,7 @@ import ( apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" _ "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/informers/messaging/v1beta1/kafkachannel/fake" "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/clientpool" + _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake" _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription/fake" "knative.dev/eventing/pkg/eventingtls/eventingtlstesting" _ "knative.dev/pkg/client/injection/ducks/duck/v1/addressable/fake"