Skip to content

Commit

Permalink
Provision EventPolicies in broker contract
Browse files Browse the repository at this point in the history
  • Loading branch information
creydr committed Aug 20, 2024
1 parent f0b3f25 commit a680a8c
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 46 deletions.
71 changes: 35 additions & 36 deletions control-plane/pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package broker
import (
"context"
"fmt"
"k8s.io/utils/ptr"
"strings"
"time"

Expand Down Expand Up @@ -99,6 +100,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, broker *eventing.Broker)

func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker) reconciler.Event {
logger := kafkalogging.CreateReconcileMethodLogger(ctx, broker)
features := feature.FromContext(ctx)

statusConditionManager := base.StatusConditionManager{
Object: broker,
Expand Down Expand Up @@ -177,8 +179,22 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker)
return statusConditionManager.FailedToResolveConfig(err)
}

var audience *string
if features.IsOIDCAuthentication() {
audience = ptr.To(auth.GetAudience(eventing.SchemeGroupVersion.WithKind("Broker"), broker.ObjectMeta))
logging.FromContext(ctx).Debugw("Setting the brokers audience", zap.String("audience", *audience))
} else {
logging.FromContext(ctx).Debug("Clearing the brokers audience as OIDC is not enabled")
audience = nil
}

err = auth.UpdateStatusWithEventPolicies(features, &broker.Status.AppliedEventPoliciesStatus, &broker.Status, r.EventPolicyLister, eventing.SchemeGroupVersion.WithKind("Broker"), broker.ObjectMeta)
if err != nil {
return fmt.Errorf("could not update broker status with EventPolicies: %v", err)
}

// Get resource configuration.
brokerResource, err := r.reconcilerBrokerResource(ctx, topic, broker, secret, topicConfig)
brokerResource, err := r.reconcilerBrokerResource(ctx, topic, broker, secret, topicConfig, audience)
if err != nil {
return statusConditionManager.FailedToResolveConfig(err)
}
Expand Down Expand Up @@ -236,29 +252,28 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker)

ingressHost := network.GetServiceHostname(r.Env.IngressName, r.DataPlaneNamespace)

transportEncryptionFlags := feature.FromContext(ctx)
var addressableStatus duckv1.AddressStatus
if transportEncryptionFlags.IsPermissiveTransportEncryption() {
if features.IsPermissiveTransportEncryption() {
caCerts, err := r.getCaCerts()
if err != nil {
return err
}

httpAddress := receiver.HTTPAddress(ingressHost, nil, broker)
httpsAddress := receiver.HTTPSAddress(ingressHost, nil, broker, caCerts)
httpAddress := receiver.HTTPAddress(ingressHost, audience, broker)
httpsAddress := receiver.HTTPSAddress(ingressHost, audience, broker, caCerts)
addressableStatus.Address = &httpAddress
addressableStatus.Addresses = []duckv1.Addressable{httpAddress, httpsAddress}
} else if transportEncryptionFlags.IsStrictTransportEncryption() {
} else if features.IsStrictTransportEncryption() {
caCerts, err := r.getCaCerts()
if err != nil {
return err
}

httpsAddress := receiver.HTTPSAddress(ingressHost, nil, broker, caCerts)
httpsAddress := receiver.HTTPSAddress(ingressHost, audience, broker, caCerts)
addressableStatus.Address = &httpsAddress
addressableStatus.Addresses = []duckv1.Addressable{httpsAddress}
} else {
httpAddress := receiver.HTTPAddress(ingressHost, nil, broker)
httpAddress := receiver.HTTPAddress(ingressHost, audience, broker)
addressableStatus.Address = &httpAddress
addressableStatus.Addresses = []duckv1.Addressable{httpAddress}
}
Expand All @@ -279,32 +294,8 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker)
broker.Status.Address = addressableStatus.Address
broker.Status.Addresses = addressableStatus.Addresses

if feature.FromContext(ctx).IsOIDCAuthentication() {
audience := auth.GetAudience(eventing.SchemeGroupVersion.WithKind("Broker"), broker.ObjectMeta)
logging.FromContext(ctx).Debugw("Setting the brokers audience", zap.String("audience", audience))
broker.Status.Address.Audience = &audience

for i := range broker.Status.Addresses {
broker.Status.Addresses[i].Audience = &audience
}
} else {
logging.FromContext(ctx).Debug("Clearing the brokers audience as OIDC is not enabled")
if broker.Status.Address != nil {
broker.Status.Address.Audience = nil
}

for i := range broker.Status.Addresses {
broker.Status.Addresses[i].Audience = nil
}
}

broker.GetConditionSet().Manage(broker.GetStatus()).MarkTrue(base.ConditionAddressable)

err = auth.UpdateStatusWithEventPolicies(feature.FromContext(ctx), &broker.Status.AppliedEventPoliciesStatus, &broker.Status, r.EventPolicyLister, eventing.SchemeGroupVersion.WithKind("Broker"), broker.ObjectMeta)
if err != nil {
return fmt.Errorf("could not update broker status with EventPolicies: %v", err)
}

return nil
}

Expand Down Expand Up @@ -629,13 +620,15 @@ func rebuildCMFromStatusAnnotations(br *eventing.Broker) *corev1.ConfigMap {
return cm
}

func (r *Reconciler) reconcilerBrokerResource(ctx context.Context, topic string, broker *eventing.Broker, secret *corev1.Secret, config *kafka.TopicConfig) (*contract.Resource, error) {
func (r *Reconciler) reconcilerBrokerResource(ctx context.Context, topic string, broker *eventing.Broker, secret *corev1.Secret, config *kafka.TopicConfig, audience *string) (*contract.Resource, error) {
features := feature.FromContext(ctx)

resource := &contract.Resource{
Uid: string(broker.UID),
Topics: []string{topic},
Ingress: &contract.Ingress{
Path: receiver.PathFromObject(broker),
EnableAutoCreateEventTypes: feature.FromContext(ctx).IsEnabled(feature.EvenTypeAutoCreate),
EnableAutoCreateEventTypes: features.IsEnabled(feature.EvenTypeAutoCreate),
},
BootstrapServers: config.GetBootstrapServers(),
Reference: &contract.Reference{
Expand All @@ -658,8 +651,8 @@ func (r *Reconciler) reconcilerBrokerResource(ctx context.Context, topic string,
}
}

if broker.Status.Address != nil && broker.Status.Address.Audience != nil {
resource.Ingress.Audience = *broker.Status.Address.Audience
if audience != nil {
resource.Ingress.Audience = *audience
}

egressConfig, err := coreconfig.EgressConfigFromDelivery(ctx, r.Resolver, broker, broker.Spec.Delivery, r.DefaultBackoffDelayMs)
Expand All @@ -668,6 +661,12 @@ func (r *Reconciler) reconcilerBrokerResource(ctx context.Context, topic string,
}
resource.EgressConfig = egressConfig

eventPolicies, err := coreconfig.EventPoliciesFromAppliedEventPoliciesStatus(broker.Status.AppliedEventPoliciesStatus, r.EventPolicyLister, broker.Namespace, features)
if err != nil {
return nil, fmt.Errorf("could not get eventpolicies from broker status: %w", err)
}
resource.Ingress.EventPolicies = eventPolicies

return resource, nil
}

Expand Down
81 changes: 71 additions & 10 deletions control-plane/pkg/reconciler/broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) {
StatusBrokerDataPlaneAvailable,
StatusBrokerConfigParsed,
StatusBrokerTopicReady,
reconcilertesting.WithBrokerEventPoliciesReadyBecauseOIDCDisabled(),
StatusBrokerProbeFailed(prober.StatusNotReady),
BrokerConfigMapAnnotations(),
WithTopicStatusAnnotation(BrokerTopic()),
Expand Down Expand Up @@ -462,6 +463,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) {
StatusBrokerDataPlaneAvailable,
StatusBrokerConfigParsed,
StatusBrokerTopicReady,
reconcilertesting.WithBrokerEventPoliciesReadyBecauseOIDCDisabled(),
BrokerConfigMapAnnotations(),
WithTopicStatusAnnotation(BrokerTopic()),
StatusBrokerProbeFailed(prober.StatusUnknown),
Expand Down Expand Up @@ -1257,6 +1259,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) {
StatusBrokerDataPlaneAvailable,
StatusBrokerConfigNotParsed("failed to resolve Spec.Delivery.DeadLetterSink: destination missing Ref and URI, expected at least one"),
StatusBrokerTopicReady,
reconcilertesting.WithBrokerEventPoliciesReadyBecauseOIDCDisabled(),
BrokerConfigMapAnnotations(),
WithTopicStatusAnnotation(BrokerTopic()),
),
Expand Down Expand Up @@ -2283,9 +2286,27 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) {
ConfigMapUpdate(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, env.ContractConfigMapFormat, &contract.Contract{
Resources: []*contract.Resource{
{
Uid: BrokerUUID,
Topics: []string{BrokerTopic()},
Ingress: &contract.Ingress{Path: receiver.Path(BrokerNamespace, BrokerName)},
Uid: BrokerUUID,
Topics: []string{BrokerTopic()},
Ingress: &contract.Ingress{
Path: receiver.Path(BrokerNamespace, BrokerName),
Audience: brokerAudience,
EventPolicies: []*contract.EventPolicy{
{
TokenMatchers: []*contract.TokenMatcher{
{
Matcher: &contract.TokenMatcher_Prefix{
Prefix: &contract.Prefix{
Attributes: map[string]string{
"sub": "system:serviceaccount:" + BrokerNamespace + ":",
},
},
},
},
},
},
},
},
BootstrapServers: bootstrapServers,
Reference: BrokerReference(),
Auth: &contract.Resource_AuthSecret{
Expand Down Expand Up @@ -2373,6 +2394,9 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) {
reconcilertesting.NewEventPolicy(readyEventPolicyName, BrokerNamespace,
reconcilertesting.WithReadyEventPolicyCondition,
reconcilertesting.WithEventPolicyToRef(brokerV1GVK, BrokerName),
reconcilertesting.WithEventPolicyStatusFromSub([]string{
"sub",
}),
),
},
Key: testKey,
Expand All @@ -2383,9 +2407,26 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) {
ConfigMapUpdate(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, env.ContractConfigMapFormat, &contract.Contract{
Resources: []*contract.Resource{
{
Uid: BrokerUUID,
Topics: []string{BrokerTopic()},
Ingress: &contract.Ingress{Path: receiver.Path(BrokerNamespace, BrokerName)},
Uid: BrokerUUID,
Topics: []string{BrokerTopic()},
Ingress: &contract.Ingress{
Path: receiver.Path(BrokerNamespace, BrokerName),
Audience: brokerAudience,
EventPolicies: []*contract.EventPolicy{
{
TokenMatchers: []*contract.TokenMatcher{
{
Matcher: &contract.TokenMatcher_Exact{
Exact: &contract.Exact{
Attributes: map[string]string{
"sub": "sub",
},
},
},
},
},
},
}},
BootstrapServers: bootstrapServers,
Reference: BrokerReference(),
},
Expand Down Expand Up @@ -2456,6 +2497,7 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) {
reconcilertesting.NewEventPolicy(unreadyEventPolicyName, BrokerNamespace,
reconcilertesting.WithUnreadyEventPolicyCondition("", ""),
reconcilertesting.WithEventPolicyToRef(brokerV1GVK, BrokerName),
reconcilertesting.WithEventPolicyStatusFromSub([]string{"sub"}),
),
},
Key: testKey,
Expand All @@ -2466,9 +2508,27 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) {
ConfigMapUpdate(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, env.ContractConfigMapFormat, &contract.Contract{
Resources: []*contract.Resource{
{
Uid: BrokerUUID,
Topics: []string{BrokerTopic()},
Ingress: &contract.Ingress{Path: receiver.Path(BrokerNamespace, BrokerName)},
Uid: BrokerUUID,
Topics: []string{BrokerTopic()},
Ingress: &contract.Ingress{
Path: receiver.Path(BrokerNamespace, BrokerName),
Audience: brokerAudience,
EventPolicies: []*contract.EventPolicy{
{
TokenMatchers: []*contract.TokenMatcher{
{
Matcher: &contract.TokenMatcher_Prefix{
Prefix: &contract.Prefix{
Attributes: map[string]string{
"sub": "system:serviceaccount:" + BrokerNamespace + ":",
},
},
},
},
},
},
},
},
BootstrapServers: bootstrapServers,
Reference: BrokerReference(),
},
Expand Down Expand Up @@ -2517,7 +2577,8 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) {
},
},
Ctx: feature.ToContext(context.Background(), feature.Flags{
feature.OIDCAuthentication: feature.Enabled,
feature.OIDCAuthentication: feature.Enabled,
feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace,
}),
},
}
Expand Down

0 comments on commit a680a8c

Please sign in to comment.