Skip to content

Commit

Permalink
Set the name value for metrics tags to the correct top-level resource
Browse files Browse the repository at this point in the history
Previously to v2, we were setting `name` to the Broker or Channel
name of the resource.

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
  • Loading branch information
pierDipi committed Sep 25, 2024
1 parent f9f799c commit 16393a7
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package v1alpha1
import (
"strings"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -108,6 +109,10 @@ type ConsumerGroupSpec struct {
// OIDCServiceAccountName is the name of service account used for this components
// OIDC authentication.
OIDCServiceAccountName *string `json:"oidcServiceAccountName,omitempty"`

// TopLevelResourceRef is a reference to a top level resource.
// For a ConsumerGroup associated with a Trigger, a Broker reference will be set.
TopLevelResourceRef *corev1.ObjectReference `json:"topLevelResourceRef,omitempty"`
}

type ConsumerGroupStatus struct {
Expand Down Expand Up @@ -210,6 +215,13 @@ func (cg *ConsumerGroup) GetUserFacingResourceRef() *metav1.OwnerReference {
return nil
}

// GetTopLevelUserFacingResourceRef gets the top level resource reference to the user-facing resources
// that are backed by this ConsumerGroup using the OwnerReference list.
// For example, for a Trigger, it will return a Broker reference.
func (cg *ConsumerGroup) GetTopLevelUserFacingResourceRef() *corev1.ObjectReference {
return cg.Spec.TopLevelResourceRef

Check warning on line 222 in control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_types.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_types.go#L221-L222

Added lines #L221 - L222 were not covered by tests
}

func (cg *ConsumerGroup) IsNotScheduled() bool {
// We want to return true when:
// - the condition isn't present, or
Expand Down
12 changes: 10 additions & 2 deletions control-plane/pkg/reconciler/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
corelisters "k8s.io/client-go/listers/core/v1"
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/clientpool"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/channel/resources"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/pkg/network"
"knative.dev/pkg/resolver"
"knative.dev/pkg/system"

"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/clientpool"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/channel/resources"

v1 "knative.dev/eventing/pkg/apis/duck/v1"
messaging "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/pkg/apis"
Expand Down Expand Up @@ -602,6 +603,13 @@ func (r *Reconciler) reconcileConsumerGroup(ctx context.Context, channel *messag
},
},
Spec: internalscg.ConsumerGroupSpec{
TopLevelResourceRef: &corev1.ObjectReference{
APIVersion: messagingv1beta1.SchemeGroupVersion.String(),
Kind: "KafkaChannel",
Name: channel.Name,
Namespace: channel.Namespace,
UID: channel.UID,
},
Template: internalscg.ConsumerTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
Expand Down
29 changes: 27 additions & 2 deletions control-plane/pkg/reconciler/channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,12 @@ import (
messagingv1beta1kafkachannelreconciler "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/reconciler/messaging/v1beta1/kafkachannel"

"github.com/rickb777/date/period"
eventingrekttesting "knative.dev/eventing/pkg/reconciler/testing/v1"
reconcilertesting "knative.dev/eventing/pkg/reconciler/testing/v1"

internalscg "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1"
kafkainternals "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1"
fakeconsumergroupinformer "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/client/fake"
eventingrekttesting "knative.dev/eventing/pkg/reconciler/testing/v1"
reconcilertesting "knative.dev/eventing/pkg/reconciler/testing/v1"
)

const (
Expand Down Expand Up @@ -478,6 +479,7 @@ func TestReconcileKind(t *testing.T) {
ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription1URI)),
ConsumerReply(ConsumerUrlReply(apis.HTTP(Subscription1ReplyURI))),
)),
withChannelTopLevelResourceRef(),
),
},
WantUpdates: []clientgotesting.UpdateActionImpl{
Expand Down Expand Up @@ -551,6 +553,7 @@ func TestReconcileKind(t *testing.T) {
ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription1URI)),
ConsumerReply(ConsumerUrlReply(apis.HTTP(Subscription1ReplyURI))),
)),
withChannelTopLevelResourceRef(),
),
},
WantUpdates: []clientgotesting.UpdateActionImpl{
Expand Down Expand Up @@ -625,6 +628,7 @@ func TestReconcileKind(t *testing.T) {
ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription1URI)),
ConsumerReply(ConsumerUrlReply(apis.HTTP(Subscription1ReplyURI))),
)),
withChannelTopLevelResourceRef(),
),
},
WantUpdates: []clientgotesting.UpdateActionImpl{
Expand Down Expand Up @@ -684,6 +688,7 @@ func TestReconcileKind(t *testing.T) {
WithConsumerGroupOwnerRef(kmeta.NewControllerRef(NewChannel())),
WithConsumerGroupMetaLabels(OwnerAsChannelLabel),
ConsumerGroupReady,
withChannelTopLevelResourceRef(),
),
},
Key: testKey,
Expand Down Expand Up @@ -725,6 +730,7 @@ func TestReconcileKind(t *testing.T) {
ConsumerReply(ConsumerUrlReply(apis.HTTP(Subscription1ReplyURI))),
)),
ConsumerGroupReady,
withChannelTopLevelResourceRef(),
),
},
},
Expand Down Expand Up @@ -779,6 +785,7 @@ func TestReconcileKind(t *testing.T) {
)),
ConsumerGroupReplicas(1),
WithConsumerGroupFailed("failed to reconcile consumer group,", "internal error"),
withChannelTopLevelResourceRef(),
),
},
Key: testKey,
Expand Down Expand Up @@ -858,6 +865,7 @@ func TestReconcileKind(t *testing.T) {
ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription1URI)),
ConsumerReply(ConsumerUrlReply(apis.HTTP(Subscription1ReplyURI))),
)),
withChannelTopLevelResourceRef(),
),
NewConsumerGroup(
WithConsumerGroupName(Subscription2UUID),
Expand All @@ -875,6 +883,7 @@ func TestReconcileKind(t *testing.T) {
ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription2URI)),
ConsumerReply(ConsumerNoReply()),
)),
withChannelTopLevelResourceRef(),
),
},
WantUpdates: []clientgotesting.UpdateActionImpl{
Expand Down Expand Up @@ -946,6 +955,7 @@ func TestReconcileKind(t *testing.T) {
ConsumerDelivery(NewConsumerSpecDelivery(kafkasource.Ordered)),
ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription2URI)),
)),
withChannelTopLevelResourceRef(),
),
},
Key: testKey,
Expand All @@ -967,6 +977,7 @@ func TestReconcileKind(t *testing.T) {
ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription1URI)),
ConsumerReply(ConsumerUrlReply(apis.HTTP(Subscription1ReplyURI))),
)),
withChannelTopLevelResourceRef(),
),
},
WantUpdates: []clientgotesting.UpdateActionImpl{
Expand Down Expand Up @@ -1218,6 +1229,7 @@ func TestReconcileKind(t *testing.T) {
)),
ConsumerGroupReplicas(1),
ConsumerGroupReady,
withChannelTopLevelResourceRef(),
),
},
Key: testKey,
Expand Down Expand Up @@ -1324,6 +1336,7 @@ func TestReconcileKind(t *testing.T) {
)),
ConsumerGroupReplicas(1),
ConsumerGroupReady,
withChannelTopLevelResourceRef(),
),
},
Key: testKey,
Expand Down Expand Up @@ -1429,6 +1442,7 @@ func TestReconcileKind(t *testing.T) {
)),
ConsumerGroupReplicas(1),
ConsumerGroupReady,
withChannelTopLevelResourceRef(),
),
},
Key: testKey,
Expand Down Expand Up @@ -1528,6 +1542,7 @@ func TestReconcileKind(t *testing.T) {
ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription1URI)),
ConsumerReply(ConsumerUrlReply(apis.HTTP(Subscription1ReplyURI))),
)),
withChannelTopLevelResourceRef(),
),
},
WantUpdates: []clientgotesting.UpdateActionImpl{
Expand Down Expand Up @@ -2430,3 +2445,13 @@ func httpsURL(name string, namespace string) *apis.URL {
Path: fmt.Sprintf("/%s/%s", namespace, name),
}
}

func withChannelTopLevelResourceRef() ConsumerGroupOption {
return WithTopLevelResourceRef(&corev1.ObjectReference{
APIVersion: messagingv1beta.SchemeGroupVersion.String(),
Kind: "KafkaChannel",
Namespace: ChannelNamespace,
Name: ChannelName,
UID: ChannelUUID,
})
}
35 changes: 34 additions & 1 deletion control-plane/pkg/reconciler/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,22 @@ func (r *Reconciler) reconcileContractResource(ctx context.Context, c *kafkainte
egress.VReplicas = 1
}

topLevelUserFacingResourceRef, err := r.reconcileTopLevelUserFacingResourceRef(c)
if err != nil {
return nil, fmt.Errorf("failed to reconcile top-level user facing resource reference: %w", err)

Check warning on line 136 in control-plane/pkg/reconciler/consumer/consumer.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/consumer/consumer.go#L136

Added line #L136 was not covered by tests
}
if topLevelUserFacingResourceRef == nil {
topLevelUserFacingResourceRef = userFacingResourceRef
}

resource := &contract.Resource{
Uid: string(c.UID),
Topics: c.Spec.Topics,
BootstrapServers: c.Spec.Configs.Configs["bootstrap.servers"],
Egresses: []*contract.Egress{egress},
Auth: nil, // Auth will be added by reconcileAuth
CloudEventOverrides: reconcileCEOverrides(c),
Reference: userFacingResourceRef,
Reference: topLevelUserFacingResourceRef,
FeatureFlags: &contract.FeatureFlags{
EnableEventTypeAutocreate: feature.FromContext(ctx).IsEnabled(feature.EvenTypeAutoCreate),
},
Expand Down Expand Up @@ -303,6 +311,31 @@ func (r *Reconciler) reconcileUserFacingResourceRef(c *kafkainternals.Consumer)
return ref, nil
}

func (r *Reconciler) reconcileTopLevelUserFacingResourceRef(c *kafkainternals.Consumer) (*contract.Reference, error) {

cg, err := r.ConsumerGroupLister.ConsumerGroups(c.GetNamespace()).Get(c.GetConsumerGroup().Name)
if apierrors.IsNotFound(err) {
return nil, nil

Check warning on line 318 in control-plane/pkg/reconciler/consumer/consumer.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/consumer/consumer.go#L318

Added line #L318 was not covered by tests
}
if err != nil {
return nil, fmt.Errorf("failed to get %s: %w", kafkainternals.ConsumerGroupGroupVersionKind.Kind, err)

Check warning on line 321 in control-plane/pkg/reconciler/consumer/consumer.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/consumer/consumer.go#L321

Added line #L321 was not covered by tests
}

userFacingResource := cg.GetTopLevelUserFacingResourceRef()
if userFacingResource == nil {
return nil, nil
}

ref := &contract.Reference{
Uuid: string(userFacingResource.UID),
Namespace: c.GetNamespace(),
Name: userFacingResource.Name,
Kind: userFacingResource.Kind,
GroupVersion: userFacingResource.APIVersion,

Check warning on line 334 in control-plane/pkg/reconciler/consumer/consumer.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/consumer/consumer.go#L329-L334

Added lines #L329 - L334 were not covered by tests
}
return ref, nil

Check warning on line 336 in control-plane/pkg/reconciler/consumer/consumer.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/consumer/consumer.go#L336

Added line #L336 was not covered by tests
}

func reconcileDeliveryOrder(c *kafkainternals.Consumer) contract.DeliveryOrder {
if c.Spec.Delivery == nil {
return contract.DeliveryOrder_UNORDERED
Expand Down
6 changes: 6 additions & 0 deletions control-plane/pkg/reconciler/testing/objects_consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,3 +248,9 @@ func WithConfigmapOwnerRef(ownerref *metav1.OwnerReference) reconcilertesting.Co
cg.ObjectMeta.OwnerReferences = []metav1.OwnerReference{*ownerref}
}
}

func WithTopLevelResourceRef(ref *corev1.ObjectReference) ConsumerGroupOption {
return func(cg *kafkainternals.ConsumerGroup) {
cg.Spec.TopLevelResourceRef = ref
}
}
7 changes: 7 additions & 0 deletions control-plane/pkg/reconciler/trigger/v2/triggerv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,13 @@ func (r *Reconciler) reconcileConsumerGroup(ctx context.Context, broker *eventin
},
},
Spec: internalscg.ConsumerGroupSpec{
TopLevelResourceRef: &corev1.ObjectReference{
APIVersion: eventing.SchemeGroupVersion.String(),
Kind: "Broker",
Name: broker.Name,
Namespace: broker.Namespace,
UID: broker.UID,
},
Template: internalscg.ConsumerTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
Expand Down
Loading

0 comments on commit 16393a7

Please sign in to comment.