Skip to content

Commit

Permalink
Provision EventPolicies in kafkasink contract
Browse files Browse the repository at this point in the history
  • Loading branch information
creydr committed Aug 20, 2024
1 parent 84c0834 commit 1e654a2
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 59 deletions.
3 changes: 3 additions & 0 deletions control-plane/pkg/reconciler/sink/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package sink
import (
"context"
"fmt"
eventpolicyinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy"
"net"
"net/http"

Expand Down Expand Up @@ -53,6 +54,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
logger := logging.FromContext(ctx)

configmapInformer := configmapinformer.Get(ctx)
eventPolicyInformer := eventpolicyinformer.Get(ctx)

clientPool := clientpool.Get(ctx)

Expand All @@ -68,6 +70,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
ReceiverLabel: base.SinkReceiverLabel,
},
ConfigMapLister: configmapInformer.Lister(),
EventPolicyLister: eventPolicyInformer.Lister(),
GetKafkaClusterAdmin: clientPool.GetClusterAdmin,
Env: configs,
}
Expand Down
1 change: 1 addition & 0 deletions control-plane/pkg/reconciler/sink/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/stretchr/testify/assert"
reconcilertesting "knative.dev/pkg/reconciler/testing"

_ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake"
_ "knative.dev/pkg/client/injection/kube/client/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/pod/fake"
Expand Down
119 changes: 64 additions & 55 deletions control-plane/pkg/reconciler/sink/kafka_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ package sink
import (
"context"
"fmt"
corev1 "k8s.io/api/core/v1"
eventingv1alpha1listers "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1"
"time"

"k8s.io/utils/ptr"
"knative.dev/eventing/pkg/auth"
"knative.dev/pkg/logging"

Expand Down Expand Up @@ -66,7 +69,8 @@ type Reconciler struct {

Resolver *resolver.URIResolver

ConfigMapLister corelisters.ConfigMapLister
ConfigMapLister corelisters.ConfigMapLister
EventPolicyLister eventingv1alpha1listers.EventPolicyLister

// GetKafkaClusterAdmin creates new sarama ClusterAdmin. It's convenient to add this as Reconciler field so that we can
// mock the function used during the reconciliation loop.
Expand All @@ -84,6 +88,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, ks *eventing.KafkaSink)

func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink) error {
logger := kafkalogging.CreateReconcileMethodLogger(ctx, ks)
features := feature.FromContext(ctx)

statusConditionManager := base.StatusConditionManager{
Object: ks,
Expand Down Expand Up @@ -180,39 +185,20 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink)
zap.Any("contract", ct),
)

// Get sink configuration.
sinkConfig := &contract.Resource{
Uid: string(ks.UID),
Topics: []string{ks.Spec.Topic},
Ingress: &contract.Ingress{
Path: receiver.PathFromObject(ks),
ContentMode: coreconfig.ContentModeFromString(*ks.Spec.ContentMode),
EnableAutoCreateEventTypes: feature.FromContext(ctx).IsEnabled(feature.EvenTypeAutoCreate),
},
BootstrapServers: kafka.BootstrapServersCommaSeparated(ks.Spec.BootstrapServers),
Reference: &contract.Reference{
Uuid: string(ks.GetUID()),
Namespace: ks.GetNamespace(),
Name: ks.GetName(),
Kind: "KafkaSink",
GroupVersion: eventingv1alpha1.SchemeGroupVersion.String(),
},
}
if ks.Spec.HasAuthConfig() {
sinkConfig.Auth = &contract.Resource_AuthSecret{
AuthSecret: &contract.Reference{
Uuid: string(secret.UID),
Namespace: secret.Namespace,
Name: secret.Name,
Version: secret.ResourceVersion,
},
}
var audience *string
if features.IsOIDCAuthentication() {
audience = ptr.To(auth.GetAudience(eventing.SchemeGroupVersion.WithKind("KafkaSink"), ks.ObjectMeta))
logging.FromContext(ctx).Debugw("Setting the kafkasinks audience", zap.String("audience", *audience))
} else {
logging.FromContext(ctx).Debug("Clearing the kafkasinks audience as OIDC is not enabled")
audience = nil
}

if ks.Status.Address != nil && ks.Status.Address.Audience != nil {
sinkConfig.Ingress.Audience = *ks.Status.Address.Audience
// Get sink configuration.
sinkConfig, err := r.getSinkContractResource(ctx, ks, secret, audience)
if err != nil {
return statusConditionManager.FailedToResolveConfig(err)
}

statusConditionManager.ConfigResolved()

sinkIndex := coreconfig.FindResource(ct, ks.UID)
Expand Down Expand Up @@ -245,16 +231,15 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink)

logger.Debug("Updated receiver pod annotation")

features := feature.FromContext(ctx)
var addressableStatus duckv1.AddressStatus
if features.IsPermissiveTransportEncryption() {
caCerts, err := r.getCaCerts()
if err != nil {
return err
}

httpAddress := receiver.HTTPAddress(r.IngressHost, nil, ks)
httpsAddress := receiver.HTTPSAddress(r.IngressHost, nil, ks, caCerts)
httpAddress := receiver.HTTPAddress(r.IngressHost, audience, ks)
httpsAddress := receiver.HTTPSAddress(r.IngressHost, audience, ks, caCerts)
// Permissive mode:
// - status.address http address with path-based routing
// - status.addresses:
Expand All @@ -271,14 +256,14 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink)
if err != nil {
return err
}
httpsAddress := receiver.HTTPSAddress(r.IngressHost, nil, ks, caCerts)
httpsAddress := receiver.HTTPSAddress(r.IngressHost, audience, ks, caCerts)

addressableStatus.Address = &httpsAddress
addressableStatus.Addresses = []duckv1.Addressable{httpsAddress}
} else {
// Disabled mode:
// Unchange
httpAddress := receiver.HTTPAddress(r.IngressHost, nil, ks)
httpAddress := receiver.HTTPAddress(r.IngressHost, audience, ks)

addressableStatus.Address = &httpAddress
addressableStatus.Addresses = []duckv1.Addressable{httpAddress}
Expand All @@ -301,25 +286,6 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink)

ks.Status.AddressStatus = addressableStatus

if features.IsOIDCAuthentication() {
audience := auth.GetAudience(eventing.SchemeGroupVersion.WithKind("KafkaSink"), ks.ObjectMeta)
logging.FromContext(ctx).Debugw("Setting the kafkasinks audience", zap.String("audience", audience))
ks.Status.Address.Audience = &audience

for i := range ks.Status.Addresses {
ks.Status.Addresses[i].Audience = &audience
}
} else {
logging.FromContext(ctx).Debug("Clearing the kafkasinks audience as OIDC is not enabled")
if ks.Status.Address != nil {
ks.Status.Address.Audience = nil
}

for i := range ks.Status.Addresses {
ks.Status.Addresses[i].Audience = nil
}
}

ks.GetConditionSet().Manage(ks.GetStatus()).MarkTrue(base.ConditionAddressable)

return nil
Expand Down Expand Up @@ -461,3 +427,46 @@ func (r *Reconciler) setTrustBundles(ct *contract.Contract) error {
ct.TrustBundles = tb
return nil
}

func (r *Reconciler) getSinkContractResource(ctx context.Context, kafkaSink *eventingv1alpha1.KafkaSink, secret *corev1.Secret, audience *string) (*contract.Resource, error) {
features := feature.FromContext(ctx)
sinkConfig := &contract.Resource{
Uid: string(kafkaSink.UID),
Topics: []string{kafkaSink.Spec.Topic},
Ingress: &contract.Ingress{
Path: receiver.PathFromObject(kafkaSink),
ContentMode: coreconfig.ContentModeFromString(*kafkaSink.Spec.ContentMode),
EnableAutoCreateEventTypes: features.IsEnabled(feature.EvenTypeAutoCreate),
},
BootstrapServers: kafka.BootstrapServersCommaSeparated(kafkaSink.Spec.BootstrapServers),
Reference: &contract.Reference{
Uuid: string(kafkaSink.GetUID()),
Namespace: kafkaSink.GetNamespace(),
Name: kafkaSink.GetName(),
Kind: "KafkaSink",
GroupVersion: eventingv1alpha1.SchemeGroupVersion.String(),
},
}
if kafkaSink.Spec.HasAuthConfig() {
sinkConfig.Auth = &contract.Resource_AuthSecret{
AuthSecret: &contract.Reference{
Uuid: string(secret.UID),
Namespace: secret.Namespace,
Name: secret.Name,
Version: secret.ResourceVersion,
},
}
}

if audience != nil {
sinkConfig.Ingress.Audience = *audience
}

eventPolicies, err := coreconfig.EventPoliciesFromAppliedEventPoliciesStatus(kafkaSink.Status.AppliedEventPoliciesStatus, r.EventPolicyLister, kafkaSink.Namespace, features)
if err != nil {
return nil, fmt.Errorf("could not get eventpolicies from kafkasink status: %w", err)
}
sinkConfig.Ingress.EventPolicies = eventPolicies

return sinkConfig, nil
}
13 changes: 9 additions & 4 deletions control-plane/pkg/reconciler/sink/kafka_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1331,9 +1331,13 @@ func sinkReconciliation(t *testing.T, format string, env config.Env) {
ConfigMapUpdate(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, env.ContractConfigMapFormat, &contract.Contract{
Resources: []*contract.Resource{
{
Uid: SinkUUID,
Topics: []string{SinkTopic()},
Ingress: &contract.Ingress{ContentMode: contract.ContentMode_BINARY, Path: receiver.Path(SinkNamespace, SinkName)},
Uid: SinkUUID,
Topics: []string{SinkTopic()},
Ingress: &contract.Ingress{
ContentMode: contract.ContentMode_BINARY,
Path: receiver.Path(SinkNamespace, SinkName),
Audience: sinkAudience,
},
BootstrapServers: bootstrapServers,
Reference: SinkReference(),
},
Expand Down Expand Up @@ -1755,7 +1759,8 @@ func useTable(t *testing.T, table TableTest, env *config.Env) {
DataPlaneNamespace: env.SystemNamespace,
ReceiverLabel: base.SinkReceiverLabel,
},
ConfigMapLister: listers.GetConfigMapLister(),
ConfigMapLister: listers.GetConfigMapLister(),
EventPolicyLister: listers.GetEventPolicyLister(),
GetKafkaClusterAdmin: func(_ context.Context, _ []string, _ *corev1.Secret) (sarama.ClusterAdmin, error) {
return &kafkatesting.MockKafkaClusterAdmin{
ExpectedTopicName: expectedTopicName,
Expand Down

0 comments on commit 1e654a2

Please sign in to comment.