From ba49d0f965e803406672dc35b2ccc456415ebed4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Tue, 20 Aug 2024 18:46:06 +0200 Subject: [PATCH] Provision contract fields for authorization (#4064) * Add utils function to get contract EP from AppliedEventPoliciesStatus * Add unit test * Provision EventPolicies in broker contract * Provision EventPolicies in kafkachannel contract * Provision EventPolicies in kafkasink contract * run gofmt and goimports * Address review comment --- control-plane/pkg/core/config/utils.go | 80 ++++++ control-plane/pkg/core/config/utils_test.go | 231 ++++++++++++++++++ control-plane/pkg/reconciler/broker/broker.go | 74 +++--- .../pkg/reconciler/broker/broker_test.go | 81 +++++- .../pkg/reconciler/channel/channel.go | 41 ++-- .../pkg/reconciler/channel/channel_test.go | 6 +- .../pkg/reconciler/channel/controller.go | 4 + .../pkg/reconciler/channel/controller_test.go | 1 + .../pkg/reconciler/sink/controller.go | 4 + .../pkg/reconciler/sink/controller_test.go | 1 + .../pkg/reconciler/sink/kafka_sink.go | 122 ++++----- .../pkg/reconciler/sink/kafka_sink_test.go | 13 +- 12 files changed, 536 insertions(+), 122 deletions(-) diff --git a/control-plane/pkg/core/config/utils.go b/control-plane/pkg/core/config/utils.go index 2a9d3a51bb..8a30079300 100644 --- a/control-plane/pkg/core/config/utils.go +++ b/control-plane/pkg/core/config/utils.go @@ -22,6 +22,10 @@ import ( "fmt" "math" "sort" + "strings" + + "knative.dev/eventing/pkg/apis/feature" + "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1" "github.com/rickb777/date/period" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -51,6 +55,82 @@ func ContentModeFromString(mode string) contract.ContentMode { } } +// EventPoliciesFromAppliedEventPoliciesStatus resolves a AppliedEventPoliciesStatus into a list of contract.EventPolicy +func EventPoliciesFromAppliedEventPoliciesStatus(status duck.AppliedEventPoliciesStatus, lister v1alpha1.EventPolicyLister, namespace string, features feature.Flags) ([]*contract.EventPolicy, error) { + eventPolicies := make([]*contract.EventPolicy, 0, len(status.Policies)) + + for _, appliedPolicy := range status.Policies { + policy, err := lister.EventPolicies(namespace).Get(appliedPolicy.Name) + if err != nil { + return nil, fmt.Errorf("failed to get eventPolicy %s: %w", appliedPolicy.Name, err) + } + + contractPolicy := &contract.EventPolicy{} + for _, from := range policy.Status.From { + if strings.HasSuffix(from, "*") { + contractPolicy.TokenMatchers = append(contractPolicy.TokenMatchers, &contract.TokenMatcher{ + Matcher: &contract.TokenMatcher_Prefix{ + Prefix: &contract.Prefix{ + Attributes: map[string]string{ + "sub": strings.TrimSuffix(from, "*"), + }, + }, + }, + }) + } else { + contractPolicy.TokenMatchers = append(contractPolicy.TokenMatchers, &contract.TokenMatcher{ + Matcher: &contract.TokenMatcher_Exact{ + Exact: &contract.Exact{ + Attributes: map[string]string{ + "sub": from, + }, + }, + }, + }) + } + } + + eventPolicies = append(eventPolicies, contractPolicy) + } + + if len(eventPolicies) == 0 { + if features.IsAuthorizationDefaultModeAllowAll() { + // add event policy to match all subs + eventPolicies = append(eventPolicies, &contract.EventPolicy{ + TokenMatchers: []*contract.TokenMatcher{ + { + Matcher: &contract.TokenMatcher_Prefix{ + Prefix: &contract.Prefix{ + Attributes: map[string]string{ + "sub": "", + }, + }, + }, + }, + }, + }) + } else if features.IsAuthorizationDefaultModeSameNamespace() { + // add event policy with prefix match + eventPolicies = append(eventPolicies, &contract.EventPolicy{ + TokenMatchers: []*contract.TokenMatcher{ + { + Matcher: &contract.TokenMatcher_Prefix{ + Prefix: &contract.Prefix{ + Attributes: map[string]string{ + "sub": fmt.Sprintf("system:serviceaccount:%s:", namespace), + }, + }, + }, + }, + }, + }) + } + // else: deny all -> add no additional policy + } + + return eventPolicies, nil +} + func EgressConfigFromDelivery( ctx context.Context, resolver *resolver.URIResolver, diff --git a/control-plane/pkg/core/config/utils_test.go b/control-plane/pkg/core/config/utils_test.go index 34cb2e70e5..3e949a459e 100644 --- a/control-plane/pkg/core/config/utils_test.go +++ b/control-plane/pkg/core/config/utils_test.go @@ -23,6 +23,11 @@ import ( "testing" "time" + "google.golang.org/protobuf/encoding/protojson" + eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + "knative.dev/eventing/pkg/apis/feature" + reconcilertesting "knative.dev/pkg/reconciler/testing" + "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "google.golang.org/protobuf/runtime/protoimpl" @@ -41,6 +46,7 @@ import ( "knative.dev/eventing-kafka-broker/control-plane/pkg/contract" eventing "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing/v1alpha1" + eventpolicyinformerfake "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake" ) func TestContentModeFromString(t *testing.T) { @@ -504,3 +510,228 @@ func TestMergeEgressConfig(t *testing.T) { }) } } + +func TestEventPoliciesFromAppliedEventPoliciesStatus(t *testing.T) { + + tests := []struct { + name string + applyingPolicies []string + existingEventPolicies []*eventingv1alpha1.EventPolicy + namespace string + defaultAuthorizationMode feature.Flag + expected []*contract.EventPolicy + wantErr bool + }{ + { + name: "Exact match", + applyingPolicies: []string{ + "policy-1", + }, + existingEventPolicies: []*eventingv1alpha1.EventPolicy{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "policy-1", + Namespace: "my-ns", + }, + Status: eventingv1alpha1.EventPolicyStatus{ + From: []string{ + "from-1", + }, + }, + }, + }, + namespace: "my-ns", + defaultAuthorizationMode: feature.AuthorizationDenyAll, + expected: []*contract.EventPolicy{ + { + TokenMatchers: []*contract.TokenMatcher{ + exactTokenMatcher("from-1"), + }, + }, + }, + }, { + name: "Prefix match", + applyingPolicies: []string{ + "policy-1", + }, + existingEventPolicies: []*eventingv1alpha1.EventPolicy{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "policy-1", + Namespace: "my-ns", + }, + Status: eventingv1alpha1.EventPolicyStatus{ + From: []string{ + "from-*", + }, + }, + }, + }, + namespace: "my-ns", + defaultAuthorizationMode: feature.AuthorizationDenyAll, + expected: []*contract.EventPolicy{ + { + TokenMatchers: []*contract.TokenMatcher{ + prefixTokenMatcher("from-"), + }, + }, + }, + }, { + name: "Multiple policies", + applyingPolicies: []string{ + "policy-1", + "policy-2", + }, + existingEventPolicies: []*eventingv1alpha1.EventPolicy{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "policy-1", + Namespace: "my-ns", + }, + Status: eventingv1alpha1.EventPolicyStatus{ + From: []string{ + "from-1", + }, + }, + }, { + ObjectMeta: metav1.ObjectMeta{ + Name: "policy-2", + Namespace: "my-ns", + }, + Status: eventingv1alpha1.EventPolicyStatus{ + From: []string{ + "from-2-*", + }, + }, + }, + }, + namespace: "my-ns", + defaultAuthorizationMode: feature.AuthorizationDenyAll, + expected: []*contract.EventPolicy{ + { + TokenMatchers: []*contract.TokenMatcher{ + exactTokenMatcher("from-1"), + }, + }, { + TokenMatchers: []*contract.TokenMatcher{ + prefixTokenMatcher("from-2-"), + }, + }, + }, + }, { + name: "No applying policies - allow-same-namespace default mode", + applyingPolicies: []string{}, + existingEventPolicies: []*eventingv1alpha1.EventPolicy{}, + namespace: "my-ns", + defaultAuthorizationMode: feature.AuthorizationAllowSameNamespace, + expected: []*contract.EventPolicy{ + { + TokenMatchers: []*contract.TokenMatcher{ + prefixTokenMatcher("system:serviceaccount:my-ns:"), + }, + }, + }, + }, { + name: "No applying policies - allow-all default mode", + applyingPolicies: []string{}, + existingEventPolicies: []*eventingv1alpha1.EventPolicy{}, + namespace: "my-ns", + defaultAuthorizationMode: feature.AuthorizationAllowAll, + expected: []*contract.EventPolicy{ + { + TokenMatchers: []*contract.TokenMatcher{ + prefixTokenMatcher(""), + }, + }, + }, + }, { + name: "No applying policies - deny-all default mode", + applyingPolicies: []string{}, + existingEventPolicies: []*eventingv1alpha1.EventPolicy{}, + namespace: "my-ns", + defaultAuthorizationMode: feature.AuthorizationDenyAll, + expected: []*contract.EventPolicy{}, + }, { + name: "Applying policy does not exist", + applyingPolicies: []string{ + "not-found", + }, + existingEventPolicies: []*eventingv1alpha1.EventPolicy{}, + namespace: "my-ns", + defaultAuthorizationMode: feature.AuthorizationAllowSameNamespace, + expected: []*contract.EventPolicy{}, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + ctx, _ := reconcilertesting.SetupFakeContext(t) + features := feature.Flags{ + feature.AuthorizationDefaultMode: tt.defaultAuthorizationMode, + } + + for _, ep := range tt.existingEventPolicies { + err := eventpolicyinformerfake.Get(ctx).Informer().GetStore().Add(ep) + if err != nil { + t.Fatal(err) + } + } + + applyingPoliciesStatus := eventingduck.AppliedEventPoliciesStatus{} + for _, ep := range tt.applyingPolicies { + applyingPoliciesStatus.Policies = append(applyingPoliciesStatus.Policies, eventingduck.AppliedEventPolicyRef{ + Name: ep, + APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), + }) + } + + got, err := EventPoliciesFromAppliedEventPoliciesStatus(applyingPoliciesStatus, eventpolicyinformerfake.Get(ctx).Lister(), tt.namespace, features) + if (err != nil) != tt.wantErr { + t.Errorf("EventPoliciesFromAppliedEventPoliciesStatus() error = %v, wantErr %v", err, tt.wantErr) + return + } + + expectedJSON, err := protojson.Marshal(&contract.Ingress{ + EventPolicies: tt.expected, + }) + if err != nil { + t.Fatal(err) + } + gotJSON, err := protojson.Marshal(&contract.Ingress{ + EventPolicies: got, + }) + if err != nil { + t.Fatal(err) + } + + if diff := cmp.Diff(expectedJSON, gotJSON); diff != "" { + t.Errorf("(-want, +got) %s", diff) + } + }) + } +} + +func exactTokenMatcher(sub string) *contract.TokenMatcher { + return &contract.TokenMatcher{ + Matcher: &contract.TokenMatcher_Exact{ + Exact: &contract.Exact{ + Attributes: map[string]string{ + "sub": sub, + }, + }, + }, + } +} + +func prefixTokenMatcher(sub string) *contract.TokenMatcher { + return &contract.TokenMatcher{ + Matcher: &contract.TokenMatcher_Prefix{ + Prefix: &contract.Prefix{ + Attributes: map[string]string{ + "sub": sub, + }, + }, + }, + } +} diff --git a/control-plane/pkg/reconciler/broker/broker.go b/control-plane/pkg/reconciler/broker/broker.go index 86b1a84997..c4c1551f64 100644 --- a/control-plane/pkg/reconciler/broker/broker.go +++ b/control-plane/pkg/reconciler/broker/broker.go @@ -22,6 +22,10 @@ import ( "strings" "time" + eventingduck "knative.dev/eventing/pkg/apis/duck/v1" + + "k8s.io/utils/ptr" + "knative.dev/eventing/pkg/auth" "knative.dev/pkg/logging" @@ -99,6 +103,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, broker *eventing.Broker) func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker) reconciler.Event { logger := kafkalogging.CreateReconcileMethodLogger(ctx, broker) + features := feature.FromContext(ctx) statusConditionManager := base.StatusConditionManager{ Object: broker, @@ -177,8 +182,22 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker) return statusConditionManager.FailedToResolveConfig(err) } + var audience *string + if features.IsOIDCAuthentication() { + audience = ptr.To(auth.GetAudience(eventing.SchemeGroupVersion.WithKind("Broker"), broker.ObjectMeta)) + logging.FromContext(ctx).Debugw("Setting the brokers audience", zap.String("audience", *audience)) + } else { + logging.FromContext(ctx).Debug("Clearing the brokers audience as OIDC is not enabled") + audience = nil + } + + err = auth.UpdateStatusWithEventPolicies(features, &broker.Status.AppliedEventPoliciesStatus, &broker.Status, r.EventPolicyLister, eventing.SchemeGroupVersion.WithKind("Broker"), broker.ObjectMeta) + if err != nil { + return fmt.Errorf("could not update broker status with EventPolicies: %v", err) + } + // Get resource configuration. - brokerResource, err := r.reconcilerBrokerResource(ctx, topic, broker, secret, topicConfig) + brokerResource, err := r.reconcilerBrokerResource(ctx, topic, broker, secret, topicConfig, audience, broker.Status.AppliedEventPoliciesStatus) if err != nil { return statusConditionManager.FailedToResolveConfig(err) } @@ -236,29 +255,28 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker) ingressHost := network.GetServiceHostname(r.Env.IngressName, r.DataPlaneNamespace) - transportEncryptionFlags := feature.FromContext(ctx) var addressableStatus duckv1.AddressStatus - if transportEncryptionFlags.IsPermissiveTransportEncryption() { + if features.IsPermissiveTransportEncryption() { caCerts, err := r.getCaCerts() if err != nil { return err } - httpAddress := receiver.HTTPAddress(ingressHost, nil, broker) - httpsAddress := receiver.HTTPSAddress(ingressHost, nil, broker, caCerts) + httpAddress := receiver.HTTPAddress(ingressHost, audience, broker) + httpsAddress := receiver.HTTPSAddress(ingressHost, audience, broker, caCerts) addressableStatus.Address = &httpAddress addressableStatus.Addresses = []duckv1.Addressable{httpAddress, httpsAddress} - } else if transportEncryptionFlags.IsStrictTransportEncryption() { + } else if features.IsStrictTransportEncryption() { caCerts, err := r.getCaCerts() if err != nil { return err } - httpsAddress := receiver.HTTPSAddress(ingressHost, nil, broker, caCerts) + httpsAddress := receiver.HTTPSAddress(ingressHost, audience, broker, caCerts) addressableStatus.Address = &httpsAddress addressableStatus.Addresses = []duckv1.Addressable{httpsAddress} } else { - httpAddress := receiver.HTTPAddress(ingressHost, nil, broker) + httpAddress := receiver.HTTPAddress(ingressHost, audience, broker) addressableStatus.Address = &httpAddress addressableStatus.Addresses = []duckv1.Addressable{httpAddress} } @@ -279,32 +297,8 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker) broker.Status.Address = addressableStatus.Address broker.Status.Addresses = addressableStatus.Addresses - if feature.FromContext(ctx).IsOIDCAuthentication() { - audience := auth.GetAudience(eventing.SchemeGroupVersion.WithKind("Broker"), broker.ObjectMeta) - logging.FromContext(ctx).Debugw("Setting the brokers audience", zap.String("audience", audience)) - broker.Status.Address.Audience = &audience - - for i := range broker.Status.Addresses { - broker.Status.Addresses[i].Audience = &audience - } - } else { - logging.FromContext(ctx).Debug("Clearing the brokers audience as OIDC is not enabled") - if broker.Status.Address != nil { - broker.Status.Address.Audience = nil - } - - for i := range broker.Status.Addresses { - broker.Status.Addresses[i].Audience = nil - } - } - broker.GetConditionSet().Manage(broker.GetStatus()).MarkTrue(base.ConditionAddressable) - err = auth.UpdateStatusWithEventPolicies(feature.FromContext(ctx), &broker.Status.AppliedEventPoliciesStatus, &broker.Status, r.EventPolicyLister, eventing.SchemeGroupVersion.WithKind("Broker"), broker.ObjectMeta) - if err != nil { - return fmt.Errorf("could not update broker status with EventPolicies: %v", err) - } - return nil } @@ -629,13 +623,15 @@ func rebuildCMFromStatusAnnotations(br *eventing.Broker) *corev1.ConfigMap { return cm } -func (r *Reconciler) reconcilerBrokerResource(ctx context.Context, topic string, broker *eventing.Broker, secret *corev1.Secret, config *kafka.TopicConfig) (*contract.Resource, error) { +func (r *Reconciler) reconcilerBrokerResource(ctx context.Context, topic string, broker *eventing.Broker, secret *corev1.Secret, config *kafka.TopicConfig, audience *string, appliedEventPoliciesStatus eventingduck.AppliedEventPoliciesStatus) (*contract.Resource, error) { + features := feature.FromContext(ctx) + resource := &contract.Resource{ Uid: string(broker.UID), Topics: []string{topic}, Ingress: &contract.Ingress{ Path: receiver.PathFromObject(broker), - EnableAutoCreateEventTypes: feature.FromContext(ctx).IsEnabled(feature.EvenTypeAutoCreate), + EnableAutoCreateEventTypes: features.IsEnabled(feature.EvenTypeAutoCreate), }, BootstrapServers: config.GetBootstrapServers(), Reference: &contract.Reference{ @@ -658,8 +654,8 @@ func (r *Reconciler) reconcilerBrokerResource(ctx context.Context, topic string, } } - if broker.Status.Address != nil && broker.Status.Address.Audience != nil { - resource.Ingress.Audience = *broker.Status.Address.Audience + if audience != nil { + resource.Ingress.Audience = *audience } egressConfig, err := coreconfig.EgressConfigFromDelivery(ctx, r.Resolver, broker, broker.Spec.Delivery, r.DefaultBackoffDelayMs) @@ -668,6 +664,12 @@ func (r *Reconciler) reconcilerBrokerResource(ctx context.Context, topic string, } resource.EgressConfig = egressConfig + eventPolicies, err := coreconfig.EventPoliciesFromAppliedEventPoliciesStatus(appliedEventPoliciesStatus, r.EventPolicyLister, broker.Namespace, features) + if err != nil { + return nil, fmt.Errorf("could not get eventpolicies from broker status: %w", err) + } + resource.Ingress.EventPolicies = eventPolicies + return resource, nil } diff --git a/control-plane/pkg/reconciler/broker/broker_test.go b/control-plane/pkg/reconciler/broker/broker_test.go index 19152f5e06..93dd26a0ca 100644 --- a/control-plane/pkg/reconciler/broker/broker_test.go +++ b/control-plane/pkg/reconciler/broker/broker_test.go @@ -399,6 +399,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { StatusBrokerDataPlaneAvailable, StatusBrokerConfigParsed, StatusBrokerTopicReady, + reconcilertesting.WithBrokerEventPoliciesReadyBecauseOIDCDisabled(), StatusBrokerProbeFailed(prober.StatusNotReady), BrokerConfigMapAnnotations(), WithTopicStatusAnnotation(BrokerTopic()), @@ -462,6 +463,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { StatusBrokerDataPlaneAvailable, StatusBrokerConfigParsed, StatusBrokerTopicReady, + reconcilertesting.WithBrokerEventPoliciesReadyBecauseOIDCDisabled(), BrokerConfigMapAnnotations(), WithTopicStatusAnnotation(BrokerTopic()), StatusBrokerProbeFailed(prober.StatusUnknown), @@ -1257,6 +1259,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { StatusBrokerDataPlaneAvailable, StatusBrokerConfigNotParsed("failed to resolve Spec.Delivery.DeadLetterSink: destination missing Ref and URI, expected at least one"), StatusBrokerTopicReady, + reconcilertesting.WithBrokerEventPoliciesReadyBecauseOIDCDisabled(), BrokerConfigMapAnnotations(), WithTopicStatusAnnotation(BrokerTopic()), ), @@ -2283,9 +2286,27 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { ConfigMapUpdate(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, env.ContractConfigMapFormat, &contract.Contract{ Resources: []*contract.Resource{ { - Uid: BrokerUUID, - Topics: []string{BrokerTopic()}, - Ingress: &contract.Ingress{Path: receiver.Path(BrokerNamespace, BrokerName)}, + Uid: BrokerUUID, + Topics: []string{BrokerTopic()}, + Ingress: &contract.Ingress{ + Path: receiver.Path(BrokerNamespace, BrokerName), + Audience: brokerAudience, + EventPolicies: []*contract.EventPolicy{ + { + TokenMatchers: []*contract.TokenMatcher{ + { + Matcher: &contract.TokenMatcher_Prefix{ + Prefix: &contract.Prefix{ + Attributes: map[string]string{ + "sub": "system:serviceaccount:" + BrokerNamespace + ":", + }, + }, + }, + }, + }, + }, + }, + }, BootstrapServers: bootstrapServers, Reference: BrokerReference(), Auth: &contract.Resource_AuthSecret{ @@ -2373,6 +2394,9 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { reconcilertesting.NewEventPolicy(readyEventPolicyName, BrokerNamespace, reconcilertesting.WithReadyEventPolicyCondition, reconcilertesting.WithEventPolicyToRef(brokerV1GVK, BrokerName), + reconcilertesting.WithEventPolicyStatusFromSub([]string{ + "sub", + }), ), }, Key: testKey, @@ -2383,9 +2407,26 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { ConfigMapUpdate(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, env.ContractConfigMapFormat, &contract.Contract{ Resources: []*contract.Resource{ { - Uid: BrokerUUID, - Topics: []string{BrokerTopic()}, - Ingress: &contract.Ingress{Path: receiver.Path(BrokerNamespace, BrokerName)}, + Uid: BrokerUUID, + Topics: []string{BrokerTopic()}, + Ingress: &contract.Ingress{ + Path: receiver.Path(BrokerNamespace, BrokerName), + Audience: brokerAudience, + EventPolicies: []*contract.EventPolicy{ + { + TokenMatchers: []*contract.TokenMatcher{ + { + Matcher: &contract.TokenMatcher_Exact{ + Exact: &contract.Exact{ + Attributes: map[string]string{ + "sub": "sub", + }, + }, + }, + }, + }, + }, + }}, BootstrapServers: bootstrapServers, Reference: BrokerReference(), }, @@ -2456,6 +2497,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { reconcilertesting.NewEventPolicy(unreadyEventPolicyName, BrokerNamespace, reconcilertesting.WithUnreadyEventPolicyCondition("", ""), reconcilertesting.WithEventPolicyToRef(brokerV1GVK, BrokerName), + reconcilertesting.WithEventPolicyStatusFromSub([]string{"sub"}), ), }, Key: testKey, @@ -2466,9 +2508,27 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { ConfigMapUpdate(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, env.ContractConfigMapFormat, &contract.Contract{ Resources: []*contract.Resource{ { - Uid: BrokerUUID, - Topics: []string{BrokerTopic()}, - Ingress: &contract.Ingress{Path: receiver.Path(BrokerNamespace, BrokerName)}, + Uid: BrokerUUID, + Topics: []string{BrokerTopic()}, + Ingress: &contract.Ingress{ + Path: receiver.Path(BrokerNamespace, BrokerName), + Audience: brokerAudience, + EventPolicies: []*contract.EventPolicy{ + { + TokenMatchers: []*contract.TokenMatcher{ + { + Matcher: &contract.TokenMatcher_Prefix{ + Prefix: &contract.Prefix{ + Attributes: map[string]string{ + "sub": "system:serviceaccount:" + BrokerNamespace + ":", + }, + }, + }, + }, + }, + }, + }, + }, BootstrapServers: bootstrapServers, Reference: BrokerReference(), }, @@ -2517,7 +2577,8 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) { }, }, Ctx: feature.ToContext(context.Background(), feature.Flags{ - feature.OIDCAuthentication: feature.Enabled, + feature.OIDCAuthentication: feature.Enabled, + feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace, }), }, } diff --git a/control-plane/pkg/reconciler/channel/channel.go b/control-plane/pkg/reconciler/channel/channel.go index 35af8b9c28..2f9089b0e7 100644 --- a/control-plane/pkg/reconciler/channel/channel.go +++ b/control-plane/pkg/reconciler/channel/channel.go @@ -24,6 +24,8 @@ import ( "strings" "time" + eventingv1alpha1listers "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1" + "k8s.io/utils/pointer" "knative.dev/eventing/pkg/auth" "knative.dev/pkg/logging" @@ -109,6 +111,7 @@ type Reconciler struct { ConfigMapLister corelisters.ConfigMapLister ServiceLister corelisters.ServiceLister SubscriptionLister messaginglisters.SubscriptionLister + EventPolicyLister eventingv1alpha1listers.EventPolicyLister Prober prober.NewProber @@ -121,6 +124,7 @@ type Reconciler struct { func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta1.KafkaChannel) reconciler.Event { logger := kafkalogging.CreateReconcileMethodLogger(ctx, channel) + featureFlags := feature.FromContext(ctx) statusConditionManager := base.StatusConditionManager{ Object: channel, @@ -219,8 +223,17 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta return statusConditionManager.FailedToResolveConfig(err) } + var audience *string + if featureFlags.IsOIDCAuthentication() { + audience = pointer.String(auth.GetAudience(messaging.SchemeGroupVersion.WithKind("KafkaChannel"), channel.ObjectMeta)) + logging.FromContext(ctx).Debugw("Setting the KafkaChannels audience", zap.String("audience", *audience)) + } else { + logging.FromContext(ctx).Debug("Clearing the KafkaChannels audience as OIDC is not enabled") + audience = nil + } + // Get resource configuration - channelResource, err := r.getChannelContractResource(ctx, topic, channel, authContext, topicConfig) + channelResource, err := r.getChannelContractResource(ctx, topic, channel, authContext, topicConfig, audience, channel.Status.AppliedEventPoliciesStatus) if err != nil { return statusConditionManager.FailedToResolveConfig(err) } @@ -275,16 +288,6 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta return err } - featureFlags := feature.FromContext(ctx) - var audience *string - if featureFlags.IsOIDCAuthentication() { - audience = pointer.String(auth.GetAudience(messaging.SchemeGroupVersion.WithKind("KafkaChannel"), channel.ObjectMeta)) - logging.FromContext(ctx).Debugw("Setting the KafkaChannels audience", zap.String("audience", *audience)) - } else { - logging.FromContext(ctx).Debug("Clearing the KafkaChannels audience as OIDC is not enabled") - audience = nil - } - var addressableStatus duckv1.AddressStatus channelHttpsHost := network.GetServiceHostname(r.Env.IngressName, r.SystemNamespace) channelHttpHost := network.GetServiceHostname(channelService.Name, channel.Namespace) @@ -685,13 +688,15 @@ func (r *Reconciler) reconcileConsumerGroup(ctx context.Context, channel *messag return cg, nil } -func (r *Reconciler) getChannelContractResource(ctx context.Context, topic string, channel *messagingv1beta1.KafkaChannel, auth *security.NetSpecAuthContext, config *kafka.TopicConfig) (*contract.Resource, error) { +func (r *Reconciler) getChannelContractResource(ctx context.Context, topic string, channel *messagingv1beta1.KafkaChannel, auth *security.NetSpecAuthContext, config *kafka.TopicConfig, audience *string, appliedEventPoliciesStatus v1.AppliedEventPoliciesStatus) (*contract.Resource, error) { + features := feature.FromContext(ctx) + resource := &contract.Resource{ Uid: string(channel.UID), Topics: []string{topic}, Ingress: &contract.Ingress{ Host: receiver.Host(channel.GetNamespace(), channel.GetName()), - EnableAutoCreateEventTypes: feature.FromContext(ctx).IsEnabled(feature.EvenTypeAutoCreate), + EnableAutoCreateEventTypes: features.IsEnabled(feature.EvenTypeAutoCreate), Path: receiver.Path(channel.GetNamespace(), channel.GetName()), }, BootstrapServers: config.GetBootstrapServers(), @@ -710,9 +715,15 @@ func (r *Reconciler) getChannelContractResource(ctx context.Context, topic strin } } - if channel.Status.Address != nil && channel.Status.Address.Audience != nil { - resource.Ingress.Audience = *channel.Status.Address.Audience + if audience != nil { + resource.Ingress.Audience = *audience + } + + eventPolicies, err := coreconfig.EventPoliciesFromAppliedEventPoliciesStatus(appliedEventPoliciesStatus, r.EventPolicyLister, channel.Namespace, features) + if err != nil { + return nil, fmt.Errorf("could not get eventpolicies from channel status: %w", err) } + resource.Ingress.EventPolicies = eventPolicies egressConfig, err := coreconfig.EgressConfigFromDelivery(ctx, r.Resolver, channel, channel.Spec.Delivery, r.DefaultBackoffDelayMs) if err != nil { diff --git a/control-plane/pkg/reconciler/channel/channel_test.go b/control-plane/pkg/reconciler/channel/channel_test.go index 08a07cdfef..e5e7725134 100644 --- a/control-plane/pkg/reconciler/channel/channel_test.go +++ b/control-plane/pkg/reconciler/channel/channel_test.go @@ -1964,8 +1964,9 @@ func TestReconcileKind(t *testing.T) { BootstrapServers: ChannelBootstrapServers, Reference: ChannelReference(), Ingress: &contract.Ingress{ - Host: receiver.Host(ChannelNamespace, ChannelName), - Path: receiver.Path(ChannelNamespace, ChannelName), + Host: receiver.Host(ChannelNamespace, ChannelName), + Path: receiver.Path(ChannelNamespace, ChannelName), + Audience: ChannelAudience, }, }, }, @@ -2085,6 +2086,7 @@ func TestReconcileKind(t *testing.T) { ServiceLister: listers.GetServiceLister(), SubscriptionLister: listers.GetSubscriptionLister(), ConsumerGroupLister: listers.GetConsumerGroupLister(), + EventPolicyLister: listers.GetEventPolicyLister(), InternalsClient: fakeconsumergroupinformer.Get(ctx), Prober: proberMock, IngressHost: network.GetServiceHostname(env.IngressName, env.SystemNamespace), diff --git a/control-plane/pkg/reconciler/channel/controller.go b/control-plane/pkg/reconciler/channel/controller.go index 7e944337a6..d383051edf 100644 --- a/control-plane/pkg/reconciler/channel/controller.go +++ b/control-plane/pkg/reconciler/channel/controller.go @@ -22,6 +22,8 @@ import ( "net" "net/http" + eventpolicyinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy" + "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/cache" @@ -63,6 +65,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf configmapInformer := configmapinformer.Get(ctx) channelInformer := kafkachannelinformer.Get(ctx) consumerGroupInformer := consumergroupinformer.Get(ctx) + eventPolicyInformer := eventpolicyinformer.Get(ctx) messagingv1beta.RegisterAlternateKafkaChannelConditionSet(conditionSet) @@ -85,6 +88,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf ServiceLister: serviceinformer.Get(ctx).Lister(), SubscriptionLister: subscriptioninformer.Get(ctx).Lister(), ConsumerGroupLister: consumerGroupInformer.Lister(), + EventPolicyLister: eventPolicyInformer.Lister(), InternalsClient: consumergroupclient.Get(ctx), KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(), } diff --git a/control-plane/pkg/reconciler/channel/controller_test.go b/control-plane/pkg/reconciler/channel/controller_test.go index a395e47ba0..e2a28f1243 100644 --- a/control-plane/pkg/reconciler/channel/controller_test.go +++ b/control-plane/pkg/reconciler/channel/controller_test.go @@ -28,6 +28,7 @@ import ( apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" _ "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/informers/messaging/v1beta1/kafkachannel/fake" "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/clientpool" + _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake" _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription/fake" "knative.dev/eventing/pkg/eventingtls/eventingtlstesting" _ "knative.dev/pkg/client/injection/ducks/duck/v1/addressable/fake" diff --git a/control-plane/pkg/reconciler/sink/controller.go b/control-plane/pkg/reconciler/sink/controller.go index 43f2d32675..b18f5ba9aa 100644 --- a/control-plane/pkg/reconciler/sink/controller.go +++ b/control-plane/pkg/reconciler/sink/controller.go @@ -22,6 +22,8 @@ import ( "net" "net/http" + eventpolicyinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy" + "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" @@ -53,6 +55,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf logger := logging.FromContext(ctx) configmapInformer := configmapinformer.Get(ctx) + eventPolicyInformer := eventpolicyinformer.Get(ctx) clientPool := clientpool.Get(ctx) @@ -68,6 +71,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf ReceiverLabel: base.SinkReceiverLabel, }, ConfigMapLister: configmapInformer.Lister(), + EventPolicyLister: eventPolicyInformer.Lister(), GetKafkaClusterAdmin: clientPool.GetClusterAdmin, Env: configs, } diff --git a/control-plane/pkg/reconciler/sink/controller_test.go b/control-plane/pkg/reconciler/sink/controller_test.go index d1caf7ff12..0fe78cde70 100644 --- a/control-plane/pkg/reconciler/sink/controller_test.go +++ b/control-plane/pkg/reconciler/sink/controller_test.go @@ -27,6 +27,7 @@ import ( "github.com/stretchr/testify/assert" reconcilertesting "knative.dev/pkg/reconciler/testing" + _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake" _ "knative.dev/pkg/client/injection/kube/client/fake" _ "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/fake" _ "knative.dev/pkg/client/injection/kube/informers/core/v1/pod/fake" diff --git a/control-plane/pkg/reconciler/sink/kafka_sink.go b/control-plane/pkg/reconciler/sink/kafka_sink.go index bd95c5947f..2f0e7bdf8d 100644 --- a/control-plane/pkg/reconciler/sink/kafka_sink.go +++ b/control-plane/pkg/reconciler/sink/kafka_sink.go @@ -21,6 +21,12 @@ import ( "fmt" "time" + eventingduck "knative.dev/eventing/pkg/apis/duck/v1" + + corev1 "k8s.io/api/core/v1" + eventingv1alpha1listers "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1" + + "k8s.io/utils/ptr" "knative.dev/eventing/pkg/auth" "knative.dev/pkg/logging" @@ -66,7 +72,8 @@ type Reconciler struct { Resolver *resolver.URIResolver - ConfigMapLister corelisters.ConfigMapLister + ConfigMapLister corelisters.ConfigMapLister + EventPolicyLister eventingv1alpha1listers.EventPolicyLister // GetKafkaClusterAdmin creates new sarama ClusterAdmin. It's convenient to add this as Reconciler field so that we can // mock the function used during the reconciliation loop. @@ -84,6 +91,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, ks *eventing.KafkaSink) func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink) error { logger := kafkalogging.CreateReconcileMethodLogger(ctx, ks) + features := feature.FromContext(ctx) statusConditionManager := base.StatusConditionManager{ Object: ks, @@ -180,39 +188,20 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink) zap.Any("contract", ct), ) - // Get sink configuration. - sinkConfig := &contract.Resource{ - Uid: string(ks.UID), - Topics: []string{ks.Spec.Topic}, - Ingress: &contract.Ingress{ - Path: receiver.PathFromObject(ks), - ContentMode: coreconfig.ContentModeFromString(*ks.Spec.ContentMode), - EnableAutoCreateEventTypes: feature.FromContext(ctx).IsEnabled(feature.EvenTypeAutoCreate), - }, - BootstrapServers: kafka.BootstrapServersCommaSeparated(ks.Spec.BootstrapServers), - Reference: &contract.Reference{ - Uuid: string(ks.GetUID()), - Namespace: ks.GetNamespace(), - Name: ks.GetName(), - Kind: "KafkaSink", - GroupVersion: eventingv1alpha1.SchemeGroupVersion.String(), - }, - } - if ks.Spec.HasAuthConfig() { - sinkConfig.Auth = &contract.Resource_AuthSecret{ - AuthSecret: &contract.Reference{ - Uuid: string(secret.UID), - Namespace: secret.Namespace, - Name: secret.Name, - Version: secret.ResourceVersion, - }, - } + var audience *string + if features.IsOIDCAuthentication() { + audience = ptr.To(auth.GetAudience(eventing.SchemeGroupVersion.WithKind("KafkaSink"), ks.ObjectMeta)) + logging.FromContext(ctx).Debugw("Setting the kafkasinks audience", zap.String("audience", *audience)) + } else { + logging.FromContext(ctx).Debug("Clearing the kafkasinks audience as OIDC is not enabled") + audience = nil } - if ks.Status.Address != nil && ks.Status.Address.Audience != nil { - sinkConfig.Ingress.Audience = *ks.Status.Address.Audience + // Get sink configuration. + sinkConfig, err := r.getSinkContractResource(ctx, ks, secret, audience, ks.Status.AppliedEventPoliciesStatus) + if err != nil { + return statusConditionManager.FailedToResolveConfig(err) } - statusConditionManager.ConfigResolved() sinkIndex := coreconfig.FindResource(ct, ks.UID) @@ -245,7 +234,6 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink) logger.Debug("Updated receiver pod annotation") - features := feature.FromContext(ctx) var addressableStatus duckv1.AddressStatus if features.IsPermissiveTransportEncryption() { caCerts, err := r.getCaCerts() @@ -253,8 +241,8 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink) return err } - httpAddress := receiver.HTTPAddress(r.IngressHost, nil, ks) - httpsAddress := receiver.HTTPSAddress(r.IngressHost, nil, ks, caCerts) + httpAddress := receiver.HTTPAddress(r.IngressHost, audience, ks) + httpsAddress := receiver.HTTPSAddress(r.IngressHost, audience, ks, caCerts) // Permissive mode: // - status.address http address with path-based routing // - status.addresses: @@ -271,14 +259,14 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink) if err != nil { return err } - httpsAddress := receiver.HTTPSAddress(r.IngressHost, nil, ks, caCerts) + httpsAddress := receiver.HTTPSAddress(r.IngressHost, audience, ks, caCerts) addressableStatus.Address = &httpsAddress addressableStatus.Addresses = []duckv1.Addressable{httpsAddress} } else { // Disabled mode: // Unchange - httpAddress := receiver.HTTPAddress(r.IngressHost, nil, ks) + httpAddress := receiver.HTTPAddress(r.IngressHost, audience, ks) addressableStatus.Address = &httpAddress addressableStatus.Addresses = []duckv1.Addressable{httpAddress} @@ -301,25 +289,6 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink) ks.Status.AddressStatus = addressableStatus - if features.IsOIDCAuthentication() { - audience := auth.GetAudience(eventing.SchemeGroupVersion.WithKind("KafkaSink"), ks.ObjectMeta) - logging.FromContext(ctx).Debugw("Setting the kafkasinks audience", zap.String("audience", audience)) - ks.Status.Address.Audience = &audience - - for i := range ks.Status.Addresses { - ks.Status.Addresses[i].Audience = &audience - } - } else { - logging.FromContext(ctx).Debug("Clearing the kafkasinks audience as OIDC is not enabled") - if ks.Status.Address != nil { - ks.Status.Address.Audience = nil - } - - for i := range ks.Status.Addresses { - ks.Status.Addresses[i].Audience = nil - } - } - ks.GetConditionSet().Manage(ks.GetStatus()).MarkTrue(base.ConditionAddressable) return nil @@ -461,3 +430,46 @@ func (r *Reconciler) setTrustBundles(ct *contract.Contract) error { ct.TrustBundles = tb return nil } + +func (r *Reconciler) getSinkContractResource(ctx context.Context, kafkaSink *eventingv1alpha1.KafkaSink, secret *corev1.Secret, audience *string, appliedEventPoliciesStatus eventingduck.AppliedEventPoliciesStatus) (*contract.Resource, error) { + features := feature.FromContext(ctx) + sinkConfig := &contract.Resource{ + Uid: string(kafkaSink.UID), + Topics: []string{kafkaSink.Spec.Topic}, + Ingress: &contract.Ingress{ + Path: receiver.PathFromObject(kafkaSink), + ContentMode: coreconfig.ContentModeFromString(*kafkaSink.Spec.ContentMode), + EnableAutoCreateEventTypes: features.IsEnabled(feature.EvenTypeAutoCreate), + }, + BootstrapServers: kafka.BootstrapServersCommaSeparated(kafkaSink.Spec.BootstrapServers), + Reference: &contract.Reference{ + Uuid: string(kafkaSink.GetUID()), + Namespace: kafkaSink.GetNamespace(), + Name: kafkaSink.GetName(), + Kind: "KafkaSink", + GroupVersion: eventingv1alpha1.SchemeGroupVersion.String(), + }, + } + if kafkaSink.Spec.HasAuthConfig() { + sinkConfig.Auth = &contract.Resource_AuthSecret{ + AuthSecret: &contract.Reference{ + Uuid: string(secret.UID), + Namespace: secret.Namespace, + Name: secret.Name, + Version: secret.ResourceVersion, + }, + } + } + + if audience != nil { + sinkConfig.Ingress.Audience = *audience + } + + eventPolicies, err := coreconfig.EventPoliciesFromAppliedEventPoliciesStatus(appliedEventPoliciesStatus, r.EventPolicyLister, kafkaSink.Namespace, features) + if err != nil { + return nil, fmt.Errorf("could not get eventpolicies from kafkasink status: %w", err) + } + sinkConfig.Ingress.EventPolicies = eventPolicies + + return sinkConfig, nil +} diff --git a/control-plane/pkg/reconciler/sink/kafka_sink_test.go b/control-plane/pkg/reconciler/sink/kafka_sink_test.go index 1583b4ea0e..494d38a96d 100644 --- a/control-plane/pkg/reconciler/sink/kafka_sink_test.go +++ b/control-plane/pkg/reconciler/sink/kafka_sink_test.go @@ -1331,9 +1331,13 @@ func sinkReconciliation(t *testing.T, format string, env config.Env) { ConfigMapUpdate(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, env.ContractConfigMapFormat, &contract.Contract{ Resources: []*contract.Resource{ { - Uid: SinkUUID, - Topics: []string{SinkTopic()}, - Ingress: &contract.Ingress{ContentMode: contract.ContentMode_BINARY, Path: receiver.Path(SinkNamespace, SinkName)}, + Uid: SinkUUID, + Topics: []string{SinkTopic()}, + Ingress: &contract.Ingress{ + ContentMode: contract.ContentMode_BINARY, + Path: receiver.Path(SinkNamespace, SinkName), + Audience: sinkAudience, + }, BootstrapServers: bootstrapServers, Reference: SinkReference(), }, @@ -1755,7 +1759,8 @@ func useTable(t *testing.T, table TableTest, env *config.Env) { DataPlaneNamespace: env.SystemNamespace, ReceiverLabel: base.SinkReceiverLabel, }, - ConfigMapLister: listers.GetConfigMapLister(), + ConfigMapLister: listers.GetConfigMapLister(), + EventPolicyLister: listers.GetEventPolicyLister(), GetKafkaClusterAdmin: func(_ context.Context, _ []string, _ *corev1.Secret) (sarama.ClusterAdmin, error) { return &kafkatesting.MockKafkaClusterAdmin{ ExpectedTopicName: expectedTopicName,