Skip to content

Commit

Permalink
Provision EventPolicies in kafkachannel contract
Browse files Browse the repository at this point in the history
  • Loading branch information
creydr committed Aug 20, 2024
1 parent a680a8c commit 84c0834
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 17 deletions.
40 changes: 25 additions & 15 deletions control-plane/pkg/reconciler/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
eventingv1alpha1listers "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -109,6 +110,7 @@ type Reconciler struct {
ConfigMapLister corelisters.ConfigMapLister
ServiceLister corelisters.ServiceLister
SubscriptionLister messaginglisters.SubscriptionLister
EventPolicyLister eventingv1alpha1listers.EventPolicyLister

Prober prober.NewProber

Expand All @@ -121,6 +123,7 @@ type Reconciler struct {

func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta1.KafkaChannel) reconciler.Event {
logger := kafkalogging.CreateReconcileMethodLogger(ctx, channel)
featureFlags := feature.FromContext(ctx)

statusConditionManager := base.StatusConditionManager{
Object: channel,
Expand Down Expand Up @@ -219,8 +222,17 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta
return statusConditionManager.FailedToResolveConfig(err)
}

var audience *string
if featureFlags.IsOIDCAuthentication() {
audience = pointer.String(auth.GetAudience(messaging.SchemeGroupVersion.WithKind("KafkaChannel"), channel.ObjectMeta))
logging.FromContext(ctx).Debugw("Setting the KafkaChannels audience", zap.String("audience", *audience))
} else {
logging.FromContext(ctx).Debug("Clearing the KafkaChannels audience as OIDC is not enabled")
audience = nil
}

// Get resource configuration
channelResource, err := r.getChannelContractResource(ctx, topic, channel, authContext, topicConfig)
channelResource, err := r.getChannelContractResource(ctx, topic, channel, authContext, topicConfig, audience)
if err != nil {
return statusConditionManager.FailedToResolveConfig(err)
}
Expand Down Expand Up @@ -275,16 +287,6 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta
return err
}

featureFlags := feature.FromContext(ctx)
var audience *string
if featureFlags.IsOIDCAuthentication() {
audience = pointer.String(auth.GetAudience(messaging.SchemeGroupVersion.WithKind("KafkaChannel"), channel.ObjectMeta))
logging.FromContext(ctx).Debugw("Setting the KafkaChannels audience", zap.String("audience", *audience))
} else {
logging.FromContext(ctx).Debug("Clearing the KafkaChannels audience as OIDC is not enabled")
audience = nil
}

var addressableStatus duckv1.AddressStatus
channelHttpsHost := network.GetServiceHostname(r.Env.IngressName, r.SystemNamespace)
channelHttpHost := network.GetServiceHostname(channelService.Name, channel.Namespace)
Expand Down Expand Up @@ -685,13 +687,15 @@ func (r *Reconciler) reconcileConsumerGroup(ctx context.Context, channel *messag
return cg, nil
}

func (r *Reconciler) getChannelContractResource(ctx context.Context, topic string, channel *messagingv1beta1.KafkaChannel, auth *security.NetSpecAuthContext, config *kafka.TopicConfig) (*contract.Resource, error) {
func (r *Reconciler) getChannelContractResource(ctx context.Context, topic string, channel *messagingv1beta1.KafkaChannel, auth *security.NetSpecAuthContext, config *kafka.TopicConfig, audience *string) (*contract.Resource, error) {
features := feature.FromContext(ctx)

resource := &contract.Resource{
Uid: string(channel.UID),
Topics: []string{topic},
Ingress: &contract.Ingress{
Host: receiver.Host(channel.GetNamespace(), channel.GetName()),
EnableAutoCreateEventTypes: feature.FromContext(ctx).IsEnabled(feature.EvenTypeAutoCreate),
EnableAutoCreateEventTypes: features.IsEnabled(feature.EvenTypeAutoCreate),
Path: receiver.Path(channel.GetNamespace(), channel.GetName()),
},
BootstrapServers: config.GetBootstrapServers(),
Expand All @@ -710,9 +714,15 @@ func (r *Reconciler) getChannelContractResource(ctx context.Context, topic strin
}
}

if channel.Status.Address != nil && channel.Status.Address.Audience != nil {
resource.Ingress.Audience = *channel.Status.Address.Audience
if audience != nil {
resource.Ingress.Audience = *audience
}

eventPolicies, err := coreconfig.EventPoliciesFromAppliedEventPoliciesStatus(channel.Status.AppliedEventPoliciesStatus, r.EventPolicyLister, channel.Namespace, features)
if err != nil {
return nil, fmt.Errorf("could not get eventpolicies from channel status: %w", err)
}
resource.Ingress.EventPolicies = eventPolicies

egressConfig, err := coreconfig.EgressConfigFromDelivery(ctx, r.Resolver, channel, channel.Spec.Delivery, r.DefaultBackoffDelayMs)
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions control-plane/pkg/reconciler/channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1964,8 +1964,9 @@ func TestReconcileKind(t *testing.T) {
BootstrapServers: ChannelBootstrapServers,
Reference: ChannelReference(),
Ingress: &contract.Ingress{
Host: receiver.Host(ChannelNamespace, ChannelName),
Path: receiver.Path(ChannelNamespace, ChannelName),
Host: receiver.Host(ChannelNamespace, ChannelName),
Path: receiver.Path(ChannelNamespace, ChannelName),
Audience: ChannelAudience,
},
},
},
Expand Down Expand Up @@ -2085,6 +2086,7 @@ func TestReconcileKind(t *testing.T) {
ServiceLister: listers.GetServiceLister(),
SubscriptionLister: listers.GetSubscriptionLister(),
ConsumerGroupLister: listers.GetConsumerGroupLister(),
EventPolicyLister: listers.GetEventPolicyLister(),
InternalsClient: fakeconsumergroupinformer.Get(ctx),
Prober: proberMock,
IngressHost: network.GetServiceHostname(env.IngressName, env.SystemNamespace),
Expand Down
3 changes: 3 additions & 0 deletions control-plane/pkg/reconciler/channel/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package channel
import (
"context"
"fmt"
eventpolicyinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy"
"net"
"net/http"

Expand Down Expand Up @@ -63,6 +64,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
configmapInformer := configmapinformer.Get(ctx)
channelInformer := kafkachannelinformer.Get(ctx)
consumerGroupInformer := consumergroupinformer.Get(ctx)
eventPolicyInformer := eventpolicyinformer.Get(ctx)

messagingv1beta.RegisterAlternateKafkaChannelConditionSet(conditionSet)

Expand All @@ -85,6 +87,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
ServiceLister: serviceinformer.Get(ctx).Lister(),
SubscriptionLister: subscriptioninformer.Get(ctx).Lister(),
ConsumerGroupLister: consumerGroupInformer.Lister(),
EventPolicyLister: eventPolicyInformer.Lister(),
InternalsClient: consumergroupclient.Get(ctx),
KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(),
}
Expand Down
1 change: 1 addition & 0 deletions control-plane/pkg/reconciler/channel/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config"
_ "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/informers/messaging/v1beta1/kafkachannel/fake"
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/clientpool"
_ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription/fake"
"knative.dev/eventing/pkg/eventingtls/eventingtlstesting"
_ "knative.dev/pkg/client/injection/ducks/duck/v1/addressable/fake"
Expand Down

0 comments on commit 84c0834

Please sign in to comment.