From 16393a7ccafac2e2132346ac6de73fe87c3c992f Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Wed, 25 Sep 2024 13:21:18 +0200 Subject: [PATCH] Set the `name` value for metrics tags to the correct top-level resource Previously to v2, we were setting `name` to the Broker or Channel name of the resource. Signed-off-by: Pierangelo Di Pilato --- .../eventing/v1alpha1/consumer_group_types.go | 12 +++++++ .../pkg/reconciler/channel/channel.go | 12 +++++-- .../pkg/reconciler/channel/channel_test.go | 29 +++++++++++++-- .../pkg/reconciler/consumer/consumer.go | 35 ++++++++++++++++++- .../testing/objects_consumergroup.go | 6 ++++ .../pkg/reconciler/trigger/v2/triggerv2.go | 7 ++++ .../reconciler/trigger/v2/triggerv2_test.go | 28 +++++++++++++++ 7 files changed, 124 insertions(+), 5 deletions(-) diff --git a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_types.go b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_types.go index aae63d4110..2e0632e3d0 100644 --- a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_types.go +++ b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_types.go @@ -19,6 +19,7 @@ package v1alpha1 import ( "strings" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" @@ -108,6 +109,10 @@ type ConsumerGroupSpec struct { // OIDCServiceAccountName is the name of service account used for this components // OIDC authentication. OIDCServiceAccountName *string `json:"oidcServiceAccountName,omitempty"` + + // TopLevelResourceRef is a reference to a top level resource. + // For a ConsumerGroup associated with a Trigger, a Broker reference will be set. + TopLevelResourceRef *corev1.ObjectReference `json:"topLevelResourceRef,omitempty"` } type ConsumerGroupStatus struct { @@ -210,6 +215,13 @@ func (cg *ConsumerGroup) GetUserFacingResourceRef() *metav1.OwnerReference { return nil } +// GetTopLevelUserFacingResourceRef gets the top level resource reference to the user-facing resources +// that are backed by this ConsumerGroup using the OwnerReference list. +// For example, for a Trigger, it will return a Broker reference. +func (cg *ConsumerGroup) GetTopLevelUserFacingResourceRef() *corev1.ObjectReference { + return cg.Spec.TopLevelResourceRef +} + func (cg *ConsumerGroup) IsNotScheduled() bool { // We want to return true when: // - the condition isn't present, or diff --git a/control-plane/pkg/reconciler/channel/channel.go b/control-plane/pkg/reconciler/channel/channel.go index 49634855f6..ce4edbd851 100644 --- a/control-plane/pkg/reconciler/channel/channel.go +++ b/control-plane/pkg/reconciler/channel/channel.go @@ -42,13 +42,14 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" corelisters "k8s.io/client-go/listers/core/v1" - "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/clientpool" - "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/channel/resources" "knative.dev/eventing/pkg/apis/feature" "knative.dev/pkg/network" "knative.dev/pkg/resolver" "knative.dev/pkg/system" + "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/clientpool" + "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/channel/resources" + v1 "knative.dev/eventing/pkg/apis/duck/v1" messaging "knative.dev/eventing/pkg/apis/messaging/v1" "knative.dev/pkg/apis" @@ -602,6 +603,13 @@ func (r *Reconciler) reconcileConsumerGroup(ctx context.Context, channel *messag }, }, Spec: internalscg.ConsumerGroupSpec{ + TopLevelResourceRef: &corev1.ObjectReference{ + APIVersion: messagingv1beta1.SchemeGroupVersion.String(), + Kind: "KafkaChannel", + Name: channel.Name, + Namespace: channel.Namespace, + UID: channel.UID, + }, Template: internalscg.ConsumerTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ diff --git a/control-plane/pkg/reconciler/channel/channel_test.go b/control-plane/pkg/reconciler/channel/channel_test.go index fc2da3c328..f7fdc86a43 100644 --- a/control-plane/pkg/reconciler/channel/channel_test.go +++ b/control-plane/pkg/reconciler/channel/channel_test.go @@ -62,11 +62,12 @@ import ( messagingv1beta1kafkachannelreconciler "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/reconciler/messaging/v1beta1/kafkachannel" "github.com/rickb777/date/period" + eventingrekttesting "knative.dev/eventing/pkg/reconciler/testing/v1" + reconcilertesting "knative.dev/eventing/pkg/reconciler/testing/v1" + internalscg "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1" kafkainternals "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1" fakeconsumergroupinformer "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/client/fake" - eventingrekttesting "knative.dev/eventing/pkg/reconciler/testing/v1" - reconcilertesting "knative.dev/eventing/pkg/reconciler/testing/v1" ) const ( @@ -478,6 +479,7 @@ func TestReconcileKind(t *testing.T) { ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription1URI)), ConsumerReply(ConsumerUrlReply(apis.HTTP(Subscription1ReplyURI))), )), + withChannelTopLevelResourceRef(), ), }, WantUpdates: []clientgotesting.UpdateActionImpl{ @@ -551,6 +553,7 @@ func TestReconcileKind(t *testing.T) { ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription1URI)), ConsumerReply(ConsumerUrlReply(apis.HTTP(Subscription1ReplyURI))), )), + withChannelTopLevelResourceRef(), ), }, WantUpdates: []clientgotesting.UpdateActionImpl{ @@ -625,6 +628,7 @@ func TestReconcileKind(t *testing.T) { ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription1URI)), ConsumerReply(ConsumerUrlReply(apis.HTTP(Subscription1ReplyURI))), )), + withChannelTopLevelResourceRef(), ), }, WantUpdates: []clientgotesting.UpdateActionImpl{ @@ -684,6 +688,7 @@ func TestReconcileKind(t *testing.T) { WithConsumerGroupOwnerRef(kmeta.NewControllerRef(NewChannel())), WithConsumerGroupMetaLabels(OwnerAsChannelLabel), ConsumerGroupReady, + withChannelTopLevelResourceRef(), ), }, Key: testKey, @@ -725,6 +730,7 @@ func TestReconcileKind(t *testing.T) { ConsumerReply(ConsumerUrlReply(apis.HTTP(Subscription1ReplyURI))), )), ConsumerGroupReady, + withChannelTopLevelResourceRef(), ), }, }, @@ -779,6 +785,7 @@ func TestReconcileKind(t *testing.T) { )), ConsumerGroupReplicas(1), WithConsumerGroupFailed("failed to reconcile consumer group,", "internal error"), + withChannelTopLevelResourceRef(), ), }, Key: testKey, @@ -858,6 +865,7 @@ func TestReconcileKind(t *testing.T) { ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription1URI)), ConsumerReply(ConsumerUrlReply(apis.HTTP(Subscription1ReplyURI))), )), + withChannelTopLevelResourceRef(), ), NewConsumerGroup( WithConsumerGroupName(Subscription2UUID), @@ -875,6 +883,7 @@ func TestReconcileKind(t *testing.T) { ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription2URI)), ConsumerReply(ConsumerNoReply()), )), + withChannelTopLevelResourceRef(), ), }, WantUpdates: []clientgotesting.UpdateActionImpl{ @@ -946,6 +955,7 @@ func TestReconcileKind(t *testing.T) { ConsumerDelivery(NewConsumerSpecDelivery(kafkasource.Ordered)), ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription2URI)), )), + withChannelTopLevelResourceRef(), ), }, Key: testKey, @@ -967,6 +977,7 @@ func TestReconcileKind(t *testing.T) { ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription1URI)), ConsumerReply(ConsumerUrlReply(apis.HTTP(Subscription1ReplyURI))), )), + withChannelTopLevelResourceRef(), ), }, WantUpdates: []clientgotesting.UpdateActionImpl{ @@ -1218,6 +1229,7 @@ func TestReconcileKind(t *testing.T) { )), ConsumerGroupReplicas(1), ConsumerGroupReady, + withChannelTopLevelResourceRef(), ), }, Key: testKey, @@ -1324,6 +1336,7 @@ func TestReconcileKind(t *testing.T) { )), ConsumerGroupReplicas(1), ConsumerGroupReady, + withChannelTopLevelResourceRef(), ), }, Key: testKey, @@ -1429,6 +1442,7 @@ func TestReconcileKind(t *testing.T) { )), ConsumerGroupReplicas(1), ConsumerGroupReady, + withChannelTopLevelResourceRef(), ), }, Key: testKey, @@ -1528,6 +1542,7 @@ func TestReconcileKind(t *testing.T) { ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription1URI)), ConsumerReply(ConsumerUrlReply(apis.HTTP(Subscription1ReplyURI))), )), + withChannelTopLevelResourceRef(), ), }, WantUpdates: []clientgotesting.UpdateActionImpl{ @@ -2430,3 +2445,13 @@ func httpsURL(name string, namespace string) *apis.URL { Path: fmt.Sprintf("/%s/%s", namespace, name), } } + +func withChannelTopLevelResourceRef() ConsumerGroupOption { + return WithTopLevelResourceRef(&corev1.ObjectReference{ + APIVersion: messagingv1beta.SchemeGroupVersion.String(), + Kind: "KafkaChannel", + Namespace: ChannelNamespace, + Name: ChannelName, + UID: ChannelUUID, + }) +} diff --git a/control-plane/pkg/reconciler/consumer/consumer.go b/control-plane/pkg/reconciler/consumer/consumer.go index 4589758ee2..0ea440f216 100644 --- a/control-plane/pkg/reconciler/consumer/consumer.go +++ b/control-plane/pkg/reconciler/consumer/consumer.go @@ -131,6 +131,14 @@ func (r *Reconciler) reconcileContractResource(ctx context.Context, c *kafkainte egress.VReplicas = 1 } + topLevelUserFacingResourceRef, err := r.reconcileTopLevelUserFacingResourceRef(c) + if err != nil { + return nil, fmt.Errorf("failed to reconcile top-level user facing resource reference: %w", err) + } + if topLevelUserFacingResourceRef == nil { + topLevelUserFacingResourceRef = userFacingResourceRef + } + resource := &contract.Resource{ Uid: string(c.UID), Topics: c.Spec.Topics, @@ -138,7 +146,7 @@ func (r *Reconciler) reconcileContractResource(ctx context.Context, c *kafkainte Egresses: []*contract.Egress{egress}, Auth: nil, // Auth will be added by reconcileAuth CloudEventOverrides: reconcileCEOverrides(c), - Reference: userFacingResourceRef, + Reference: topLevelUserFacingResourceRef, FeatureFlags: &contract.FeatureFlags{ EnableEventTypeAutocreate: feature.FromContext(ctx).IsEnabled(feature.EvenTypeAutoCreate), }, @@ -303,6 +311,31 @@ func (r *Reconciler) reconcileUserFacingResourceRef(c *kafkainternals.Consumer) return ref, nil } +func (r *Reconciler) reconcileTopLevelUserFacingResourceRef(c *kafkainternals.Consumer) (*contract.Reference, error) { + + cg, err := r.ConsumerGroupLister.ConsumerGroups(c.GetNamespace()).Get(c.GetConsumerGroup().Name) + if apierrors.IsNotFound(err) { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("failed to get %s: %w", kafkainternals.ConsumerGroupGroupVersionKind.Kind, err) + } + + userFacingResource := cg.GetTopLevelUserFacingResourceRef() + if userFacingResource == nil { + return nil, nil + } + + ref := &contract.Reference{ + Uuid: string(userFacingResource.UID), + Namespace: c.GetNamespace(), + Name: userFacingResource.Name, + Kind: userFacingResource.Kind, + GroupVersion: userFacingResource.APIVersion, + } + return ref, nil +} + func reconcileDeliveryOrder(c *kafkainternals.Consumer) contract.DeliveryOrder { if c.Spec.Delivery == nil { return contract.DeliveryOrder_UNORDERED diff --git a/control-plane/pkg/reconciler/testing/objects_consumergroup.go b/control-plane/pkg/reconciler/testing/objects_consumergroup.go index a8a3a6171d..f0ee91623a 100644 --- a/control-plane/pkg/reconciler/testing/objects_consumergroup.go +++ b/control-plane/pkg/reconciler/testing/objects_consumergroup.go @@ -248,3 +248,9 @@ func WithConfigmapOwnerRef(ownerref *metav1.OwnerReference) reconcilertesting.Co cg.ObjectMeta.OwnerReferences = []metav1.OwnerReference{*ownerref} } } + +func WithTopLevelResourceRef(ref *corev1.ObjectReference) ConsumerGroupOption { + return func(cg *kafkainternals.ConsumerGroup) { + cg.Spec.TopLevelResourceRef = ref + } +} diff --git a/control-plane/pkg/reconciler/trigger/v2/triggerv2.go b/control-plane/pkg/reconciler/trigger/v2/triggerv2.go index d933b17d64..e05eeaa48d 100644 --- a/control-plane/pkg/reconciler/trigger/v2/triggerv2.go +++ b/control-plane/pkg/reconciler/trigger/v2/triggerv2.go @@ -209,6 +209,13 @@ func (r *Reconciler) reconcileConsumerGroup(ctx context.Context, broker *eventin }, }, Spec: internalscg.ConsumerGroupSpec{ + TopLevelResourceRef: &corev1.ObjectReference{ + APIVersion: eventing.SchemeGroupVersion.String(), + Kind: "Broker", + Name: broker.Name, + Namespace: broker.Namespace, + UID: broker.UID, + }, Template: internalscg.ConsumerTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ diff --git a/control-plane/pkg/reconciler/trigger/v2/triggerv2_test.go b/control-plane/pkg/reconciler/trigger/v2/triggerv2_test.go index 1f87fd63c1..1b212b1e85 100644 --- a/control-plane/pkg/reconciler/trigger/v2/triggerv2_test.go +++ b/control-plane/pkg/reconciler/trigger/v2/triggerv2_test.go @@ -126,6 +126,7 @@ func TestReconcileKind(t *testing.T) { ConsumerFilters(NewConsumerSpecFilters()), ConsumerReply(ConsumerTopicReply()), )), + withBrokerTopLevelResourceRef(), ), }, WantEvents: []string{ @@ -187,6 +188,7 @@ func TestReconcileKind(t *testing.T) { ConsumerFilters(NewConsumerSpecFilters()), ConsumerReply(ConsumerTopicReply()), )), + withBrokerTopLevelResourceRef(), ), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{ @@ -241,6 +243,7 @@ func TestReconcileKind(t *testing.T) { ConsumerFilters(NewConsumerSpecFilters()), ConsumerReply(ConsumerTopicReply()), )), + withBrokerTopLevelResourceRef(), ), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{ @@ -296,6 +299,7 @@ func TestReconcileKind(t *testing.T) { ConsumerFilters(NewConsumerSpecFilters()), ConsumerReply(ConsumerTopicReply()), )), + withBrokerTopLevelResourceRef(), ), }, WantEvents: []string{ @@ -403,6 +407,7 @@ func TestReconcileKind(t *testing.T) { ConsumerReply(ConsumerTopicReply()), )), ConsumerGroupReady, + withBrokerTopLevelResourceRef(), ), }, }, @@ -448,6 +453,7 @@ func TestReconcileKind(t *testing.T) { WithConsumerGroupMetaLabels(OwnerAsTriggerLabel), WithConsumerGroupLabels(ConsumerTriggerLabel), ConsumerGroupReady, + withBrokerTopLevelResourceRef(), ), }, Key: testKey, @@ -472,6 +478,7 @@ func TestReconcileKind(t *testing.T) { ConsumerReply(ConsumerTopicReply()), )), ConsumerGroupReady, + withBrokerTopLevelResourceRef(), ), }, }, @@ -515,6 +522,7 @@ func TestReconcileKind(t *testing.T) { WithConsumerGroupOwnerRef(kmeta.NewControllerRef(newTrigger())), WithConsumerGroupMetaLabels(OwnerAsTriggerLabel), WithConsumerGroupLabels(ConsumerTriggerLabel), + withBrokerTopLevelResourceRef(), ), }, Key: testKey, @@ -537,6 +545,7 @@ func TestReconcileKind(t *testing.T) { ConsumerFilters(NewConsumerSpecFilters()), ConsumerReply(ConsumerTopicReply()), )), + withBrokerTopLevelResourceRef(), ), }, }, @@ -597,6 +606,7 @@ func TestReconcileKind(t *testing.T) { }, }), )), + withBrokerTopLevelResourceRef(), ), NewLegacySASLSecret(ConfigMapNamespace, "secret-1"), }, @@ -627,6 +637,7 @@ func TestReconcileKind(t *testing.T) { }, }), )), + withBrokerTopLevelResourceRef(), ), }, }, @@ -682,6 +693,7 @@ func TestReconcileKind(t *testing.T) { )), ConsumerGroupReady, ConsumerGroupReplicas(1), + withBrokerTopLevelResourceRef(), ), }, Key: testKey, @@ -736,6 +748,7 @@ func TestReconcileKind(t *testing.T) { ConsumerReply(ConsumerTopicReply()), )), ConsumerGroupReplicas(1), + withBrokerTopLevelResourceRef(), ), }, Key: testKey, @@ -791,6 +804,7 @@ func TestReconcileKind(t *testing.T) { )), WithConsumerGroupFailed("failed", "failed"), ConsumerGroupReplicas(1), + withBrokerTopLevelResourceRef(), ), }, Key: testKey, @@ -845,6 +859,7 @@ func TestReconcileKind(t *testing.T) { )), WithDeadLetterSinkURI(url.String()), ConsumerGroupReplicas(1), + withBrokerTopLevelResourceRef(), ), }, Key: testKey, @@ -992,6 +1007,7 @@ func TestReconcileKind(t *testing.T) { WithConsumerGroupMetaLabels(OwnerAsTriggerLabel), WithConsumerGroupLabels(ConsumerTriggerLabel), ConsumerGroupReady, + withBrokerTopLevelResourceRef(), ), }, Key: testKey, @@ -1015,6 +1031,7 @@ func TestReconcileKind(t *testing.T) { ConsumerReply(ConsumerTopicReply()), )), ConsumerGroupReady, + withBrokerTopLevelResourceRef(), ), }, }, @@ -1073,6 +1090,7 @@ func TestReconcileKind(t *testing.T) { )), ConsumerGroupReady, ConsumerGroupReplicas(1), + withBrokerTopLevelResourceRef(), ), }, Key: testKey, @@ -1172,3 +1190,13 @@ func removeFinalizers() clientgotesting.PatchActionImpl { action.Patch = []byte(patch) return action } + +func withBrokerTopLevelResourceRef() ConsumerGroupOption { + return WithTopLevelResourceRef(&corev1.ObjectReference{ + APIVersion: eventing.SchemeGroupVersion.String(), + Kind: "Broker", + Namespace: BrokerNamespace, + Name: BrokerName, + UID: BrokerUUID, + }) +}