diff --git a/control-plane/pkg/reconciler/broker/broker.go b/control-plane/pkg/reconciler/broker/broker.go index 86b1a84997..4fbcad49ad 100644 --- a/control-plane/pkg/reconciler/broker/broker.go +++ b/control-plane/pkg/reconciler/broker/broker.go @@ -19,6 +19,7 @@ package broker import ( "context" "fmt" + "k8s.io/utils/ptr" "strings" "time" @@ -99,6 +100,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, broker *eventing.Broker) func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker) reconciler.Event { logger := kafkalogging.CreateReconcileMethodLogger(ctx, broker) + features := feature.FromContext(ctx) statusConditionManager := base.StatusConditionManager{ Object: broker, @@ -177,8 +179,22 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker) return statusConditionManager.FailedToResolveConfig(err) } + var audience *string + if features.IsOIDCAuthentication() { + audience = ptr.To(auth.GetAudience(eventing.SchemeGroupVersion.WithKind("Broker"), broker.ObjectMeta)) + logging.FromContext(ctx).Debugw("Setting the brokers audience", zap.String("audience", *audience)) + } else { + logging.FromContext(ctx).Debug("Clearing the brokers audience as OIDC is not enabled") + audience = nil + } + + err = auth.UpdateStatusWithEventPolicies(features, &broker.Status.AppliedEventPoliciesStatus, &broker.Status, r.EventPolicyLister, eventing.SchemeGroupVersion.WithKind("Broker"), broker.ObjectMeta) + if err != nil { + return fmt.Errorf("could not update broker status with EventPolicies: %v", err) + } + // Get resource configuration. - brokerResource, err := r.reconcilerBrokerResource(ctx, topic, broker, secret, topicConfig) + brokerResource, err := r.reconcilerBrokerResource(ctx, topic, broker, secret, topicConfig, audience) if err != nil { return statusConditionManager.FailedToResolveConfig(err) } @@ -236,29 +252,28 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker) ingressHost := network.GetServiceHostname(r.Env.IngressName, r.DataPlaneNamespace) - transportEncryptionFlags := feature.FromContext(ctx) var addressableStatus duckv1.AddressStatus - if transportEncryptionFlags.IsPermissiveTransportEncryption() { + if features.IsPermissiveTransportEncryption() { caCerts, err := r.getCaCerts() if err != nil { return err } - httpAddress := receiver.HTTPAddress(ingressHost, nil, broker) - httpsAddress := receiver.HTTPSAddress(ingressHost, nil, broker, caCerts) + httpAddress := receiver.HTTPAddress(ingressHost, audience, broker) + httpsAddress := receiver.HTTPSAddress(ingressHost, audience, broker, caCerts) addressableStatus.Address = &httpAddress addressableStatus.Addresses = []duckv1.Addressable{httpAddress, httpsAddress} - } else if transportEncryptionFlags.IsStrictTransportEncryption() { + } else if features.IsStrictTransportEncryption() { caCerts, err := r.getCaCerts() if err != nil { return err } - httpsAddress := receiver.HTTPSAddress(ingressHost, nil, broker, caCerts) + httpsAddress := receiver.HTTPSAddress(ingressHost, audience, broker, caCerts) addressableStatus.Address = &httpsAddress addressableStatus.Addresses = []duckv1.Addressable{httpsAddress} } else { - httpAddress := receiver.HTTPAddress(ingressHost, nil, broker) + httpAddress := receiver.HTTPAddress(ingressHost, audience, broker) addressableStatus.Address = &httpAddress addressableStatus.Addresses = []duckv1.Addressable{httpAddress} } @@ -279,32 +294,8 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker) broker.Status.Address = addressableStatus.Address broker.Status.Addresses = addressableStatus.Addresses - if feature.FromContext(ctx).IsOIDCAuthentication() { - audience := auth.GetAudience(eventing.SchemeGroupVersion.WithKind("Broker"), broker.ObjectMeta) - logging.FromContext(ctx).Debugw("Setting the brokers audience", zap.String("audience", audience)) - broker.Status.Address.Audience = &audience - - for i := range broker.Status.Addresses { - broker.Status.Addresses[i].Audience = &audience - } - } else { - logging.FromContext(ctx).Debug("Clearing the brokers audience as OIDC is not enabled") - if broker.Status.Address != nil { - broker.Status.Address.Audience = nil - } - - for i := range broker.Status.Addresses { - broker.Status.Addresses[i].Audience = nil - } - } - broker.GetConditionSet().Manage(broker.GetStatus()).MarkTrue(base.ConditionAddressable) - err = auth.UpdateStatusWithEventPolicies(feature.FromContext(ctx), &broker.Status.AppliedEventPoliciesStatus, &broker.Status, r.EventPolicyLister, eventing.SchemeGroupVersion.WithKind("Broker"), broker.ObjectMeta) - if err != nil { - return fmt.Errorf("could not update broker status with EventPolicies: %v", err) - } - return nil } @@ -629,13 +620,15 @@ func rebuildCMFromStatusAnnotations(br *eventing.Broker) *corev1.ConfigMap { return cm } -func (r *Reconciler) reconcilerBrokerResource(ctx context.Context, topic string, broker *eventing.Broker, secret *corev1.Secret, config *kafka.TopicConfig) (*contract.Resource, error) { +func (r *Reconciler) reconcilerBrokerResource(ctx context.Context, topic string, broker *eventing.Broker, secret *corev1.Secret, config *kafka.TopicConfig, audience *string) (*contract.Resource, error) { + features := feature.FromContext(ctx) + resource := &contract.Resource{ Uid: string(broker.UID), Topics: []string{topic}, Ingress: &contract.Ingress{ Path: receiver.PathFromObject(broker), - EnableAutoCreateEventTypes: feature.FromContext(ctx).IsEnabled(feature.EvenTypeAutoCreate), + EnableAutoCreateEventTypes: features.IsEnabled(feature.EvenTypeAutoCreate), }, BootstrapServers: config.GetBootstrapServers(), Reference: &contract.Reference{ @@ -658,8 +651,8 @@ func (r *Reconciler) reconcilerBrokerResource(ctx context.Context, topic string, } } - if broker.Status.Address != nil && broker.Status.Address.Audience != nil { - resource.Ingress.Audience = *broker.Status.Address.Audience + if audience != nil { + resource.Ingress.Audience = *audience } egressConfig, err := coreconfig.EgressConfigFromDelivery(ctx, r.Resolver, broker, broker.Spec.Delivery, r.DefaultBackoffDelayMs) @@ -668,6 +661,12 @@ func (r *Reconciler) reconcilerBrokerResource(ctx context.Context, topic string, } resource.EgressConfig = egressConfig + eventPolicies, err := coreconfig.EventPoliciesFromAppliedEventPoliciesStatus(broker.Status.AppliedEventPoliciesStatus, r.EventPolicyLister, broker.Namespace, features) + if err != nil { + return nil, fmt.Errorf("could not get eventpolicies from broker status: %w", err) + } + resource.Ingress.EventPolicies = eventPolicies + return resource, nil } diff --git a/control-plane/pkg/reconciler/broker/broker_test.go b/control-plane/pkg/reconciler/broker/broker_test.go index 19152f5e06..93dd26a0ca 100644 --- a/control-plane/pkg/reconciler/broker/broker_test.go +++ b/control-plane/pkg/reconciler/broker/broker_test.go @@ -399,6 +399,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { StatusBrokerDataPlaneAvailable, StatusBrokerConfigParsed, StatusBrokerTopicReady, + reconcilertesting.WithBrokerEventPoliciesReadyBecauseOIDCDisabled(), StatusBrokerProbeFailed(prober.StatusNotReady), BrokerConfigMapAnnotations(), WithTopicStatusAnnotation(BrokerTopic()), @@ -462,6 +463,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { StatusBrokerDataPlaneAvailable, StatusBrokerConfigParsed, StatusBrokerTopicReady, + reconcilertesting.WithBrokerEventPoliciesReadyBecauseOIDCDisabled(), BrokerConfigMapAnnotations(), WithTopicStatusAnnotation(BrokerTopic()), StatusBrokerProbeFailed(prober.StatusUnknown), @@ -1257,6 +1259,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { StatusBrokerDataPlaneAvailable, StatusBrokerConfigNotParsed("failed to resolve Spec.Delivery.DeadLetterSink: destination missing Ref and URI, expected at least one"), StatusBrokerTopicReady, + reconcilertesting.WithBrokerEventPoliciesReadyBecauseOIDCDisabled(), BrokerConfigMapAnnotations(), WithTopicStatusAnnotation(BrokerTopic()), ), @@ -2283,9 +2286,27 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { ConfigMapUpdate(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, env.ContractConfigMapFormat, &contract.Contract{ Resources: []*contract.Resource{ { - Uid: BrokerUUID, - Topics: []string{BrokerTopic()}, - Ingress: &contract.Ingress{Path: receiver.Path(BrokerNamespace, BrokerName)}, + Uid: BrokerUUID, + Topics: []string{BrokerTopic()}, + Ingress: &contract.Ingress{ + Path: receiver.Path(BrokerNamespace, BrokerName), + Audience: brokerAudience, + EventPolicies: []*contract.EventPolicy{ + { + TokenMatchers: []*contract.TokenMatcher{ + { + Matcher: &contract.TokenMatcher_Prefix{ + Prefix: &contract.Prefix{ + Attributes: map[string]string{ + "sub": "system:serviceaccount:" + BrokerNamespace + ":", + }, + }, + }, + }, + }, + }, + }, + }, BootstrapServers: bootstrapServers, Reference: BrokerReference(), Auth: &contract.Resource_AuthSecret{ @@ -2373,6 +2394,9 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { reconcilertesting.NewEventPolicy(readyEventPolicyName, BrokerNamespace, reconcilertesting.WithReadyEventPolicyCondition, reconcilertesting.WithEventPolicyToRef(brokerV1GVK, BrokerName), + reconcilertesting.WithEventPolicyStatusFromSub([]string{ + "sub", + }), ), }, Key: testKey, @@ -2383,9 +2407,26 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { ConfigMapUpdate(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, env.ContractConfigMapFormat, &contract.Contract{ Resources: []*contract.Resource{ { - Uid: BrokerUUID, - Topics: []string{BrokerTopic()}, - Ingress: &contract.Ingress{Path: receiver.Path(BrokerNamespace, BrokerName)}, + Uid: BrokerUUID, + Topics: []string{BrokerTopic()}, + Ingress: &contract.Ingress{ + Path: receiver.Path(BrokerNamespace, BrokerName), + Audience: brokerAudience, + EventPolicies: []*contract.EventPolicy{ + { + TokenMatchers: []*contract.TokenMatcher{ + { + Matcher: &contract.TokenMatcher_Exact{ + Exact: &contract.Exact{ + Attributes: map[string]string{ + "sub": "sub", + }, + }, + }, + }, + }, + }, + }}, BootstrapServers: bootstrapServers, Reference: BrokerReference(), }, @@ -2456,6 +2497,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { reconcilertesting.NewEventPolicy(unreadyEventPolicyName, BrokerNamespace, reconcilertesting.WithUnreadyEventPolicyCondition("", ""), reconcilertesting.WithEventPolicyToRef(brokerV1GVK, BrokerName), + reconcilertesting.WithEventPolicyStatusFromSub([]string{"sub"}), ), }, Key: testKey, @@ -2466,9 +2508,27 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { ConfigMapUpdate(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, env.ContractConfigMapFormat, &contract.Contract{ Resources: []*contract.Resource{ { - Uid: BrokerUUID, - Topics: []string{BrokerTopic()}, - Ingress: &contract.Ingress{Path: receiver.Path(BrokerNamespace, BrokerName)}, + Uid: BrokerUUID, + Topics: []string{BrokerTopic()}, + Ingress: &contract.Ingress{ + Path: receiver.Path(BrokerNamespace, BrokerName), + Audience: brokerAudience, + EventPolicies: []*contract.EventPolicy{ + { + TokenMatchers: []*contract.TokenMatcher{ + { + Matcher: &contract.TokenMatcher_Prefix{ + Prefix: &contract.Prefix{ + Attributes: map[string]string{ + "sub": "system:serviceaccount:" + BrokerNamespace + ":", + }, + }, + }, + }, + }, + }, + }, + }, BootstrapServers: bootstrapServers, Reference: BrokerReference(), }, @@ -2517,7 +2577,8 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { }, }, Ctx: feature.ToContext(context.Background(), feature.Flags{ - feature.OIDCAuthentication: feature.Enabled, + feature.OIDCAuthentication: feature.Enabled, + feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace, }), }, }