Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set the name value for metrics tags to the correct top-level resource #4120

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import (
"strings"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -108,6 +109,10 @@
// OIDCServiceAccountName is the name of service account used for this components
// OIDC authentication.
OIDCServiceAccountName *string `json:"oidcServiceAccountName,omitempty"`

// TopLevelResourceRef is a reference to a top level resource.
// For a ConsumerGroup associated with a Trigger, a Broker reference will be set.
TopLevelResourceRef *corev1.ObjectReference `json:"topLevelResourceRef,omitempty"`
}

type ConsumerGroupStatus struct {
Expand Down Expand Up @@ -210,6 +215,13 @@
return nil
}

// GetTopLevelUserFacingResourceRef gets the top level resource reference to the user-facing resources
// that are backed by this ConsumerGroup using the OwnerReference list.
// For example, for a Trigger, it will return a Broker reference.
func (cg *ConsumerGroup) GetTopLevelUserFacingResourceRef() *corev1.ObjectReference {
return cg.Spec.TopLevelResourceRef

Check warning on line 222 in control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_types.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_types.go#L221-L222

Added lines #L221 - L222 were not covered by tests
}

func (cg *ConsumerGroup) IsNotScheduled() bool {
// We want to return true when:
// - the condition isn't present, or
Expand Down
12 changes: 10 additions & 2 deletions control-plane/pkg/reconciler/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
corelisters "k8s.io/client-go/listers/core/v1"
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/clientpool"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/channel/resources"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/pkg/network"
"knative.dev/pkg/resolver"
"knative.dev/pkg/system"

"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/clientpool"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/channel/resources"

v1 "knative.dev/eventing/pkg/apis/duck/v1"
messaging "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/pkg/apis"
Expand Down Expand Up @@ -602,6 +603,13 @@ func (r *Reconciler) reconcileConsumerGroup(ctx context.Context, channel *messag
},
},
Spec: internalscg.ConsumerGroupSpec{
TopLevelResourceRef: &corev1.ObjectReference{
APIVersion: messagingv1beta1.SchemeGroupVersion.String(),
Kind: "KafkaChannel",
Name: channel.Name,
Namespace: channel.Namespace,
UID: channel.UID,
},
Template: internalscg.ConsumerTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
Expand Down
29 changes: 27 additions & 2 deletions control-plane/pkg/reconciler/channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,12 @@ import (
messagingv1beta1kafkachannelreconciler "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/reconciler/messaging/v1beta1/kafkachannel"

"github.com/rickb777/date/period"
eventingrekttesting "knative.dev/eventing/pkg/reconciler/testing/v1"
reconcilertesting "knative.dev/eventing/pkg/reconciler/testing/v1"

internalscg "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1"
kafkainternals "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1"
fakeconsumergroupinformer "knative.dev/eventing-kafka-broker/control-plane/pkg/client/internals/kafka/injection/client/fake"
eventingrekttesting "knative.dev/eventing/pkg/reconciler/testing/v1"
reconcilertesting "knative.dev/eventing/pkg/reconciler/testing/v1"
)

const (
Expand Down Expand Up @@ -478,6 +479,7 @@ func TestReconcileKind(t *testing.T) {
ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription1URI)),
ConsumerReply(ConsumerUrlReply(apis.HTTP(Subscription1ReplyURI))),
)),
withChannelTopLevelResourceRef(),
),
},
WantUpdates: []clientgotesting.UpdateActionImpl{
Expand Down Expand Up @@ -551,6 +553,7 @@ func TestReconcileKind(t *testing.T) {
ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription1URI)),
ConsumerReply(ConsumerUrlReply(apis.HTTP(Subscription1ReplyURI))),
)),
withChannelTopLevelResourceRef(),
),
},
WantUpdates: []clientgotesting.UpdateActionImpl{
Expand Down Expand Up @@ -625,6 +628,7 @@ func TestReconcileKind(t *testing.T) {
ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription1URI)),
ConsumerReply(ConsumerUrlReply(apis.HTTP(Subscription1ReplyURI))),
)),
withChannelTopLevelResourceRef(),
),
},
WantUpdates: []clientgotesting.UpdateActionImpl{
Expand Down Expand Up @@ -684,6 +688,7 @@ func TestReconcileKind(t *testing.T) {
WithConsumerGroupOwnerRef(kmeta.NewControllerRef(NewChannel())),
WithConsumerGroupMetaLabels(OwnerAsChannelLabel),
ConsumerGroupReady,
withChannelTopLevelResourceRef(),
),
},
Key: testKey,
Expand Down Expand Up @@ -725,6 +730,7 @@ func TestReconcileKind(t *testing.T) {
ConsumerReply(ConsumerUrlReply(apis.HTTP(Subscription1ReplyURI))),
)),
ConsumerGroupReady,
withChannelTopLevelResourceRef(),
),
},
},
Expand Down Expand Up @@ -779,6 +785,7 @@ func TestReconcileKind(t *testing.T) {
)),
ConsumerGroupReplicas(1),
WithConsumerGroupFailed("failed to reconcile consumer group,", "internal error"),
withChannelTopLevelResourceRef(),
),
},
Key: testKey,
Expand Down Expand Up @@ -858,6 +865,7 @@ func TestReconcileKind(t *testing.T) {
ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription1URI)),
ConsumerReply(ConsumerUrlReply(apis.HTTP(Subscription1ReplyURI))),
)),
withChannelTopLevelResourceRef(),
),
NewConsumerGroup(
WithConsumerGroupName(Subscription2UUID),
Expand All @@ -875,6 +883,7 @@ func TestReconcileKind(t *testing.T) {
ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription2URI)),
ConsumerReply(ConsumerNoReply()),
)),
withChannelTopLevelResourceRef(),
),
},
WantUpdates: []clientgotesting.UpdateActionImpl{
Expand Down Expand Up @@ -946,6 +955,7 @@ func TestReconcileKind(t *testing.T) {
ConsumerDelivery(NewConsumerSpecDelivery(kafkasource.Ordered)),
ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription2URI)),
)),
withChannelTopLevelResourceRef(),
),
},
Key: testKey,
Expand All @@ -967,6 +977,7 @@ func TestReconcileKind(t *testing.T) {
ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription1URI)),
ConsumerReply(ConsumerUrlReply(apis.HTTP(Subscription1ReplyURI))),
)),
withChannelTopLevelResourceRef(),
),
},
WantUpdates: []clientgotesting.UpdateActionImpl{
Expand Down Expand Up @@ -1218,6 +1229,7 @@ func TestReconcileKind(t *testing.T) {
)),
ConsumerGroupReplicas(1),
ConsumerGroupReady,
withChannelTopLevelResourceRef(),
),
},
Key: testKey,
Expand Down Expand Up @@ -1324,6 +1336,7 @@ func TestReconcileKind(t *testing.T) {
)),
ConsumerGroupReplicas(1),
ConsumerGroupReady,
withChannelTopLevelResourceRef(),
),
},
Key: testKey,
Expand Down Expand Up @@ -1429,6 +1442,7 @@ func TestReconcileKind(t *testing.T) {
)),
ConsumerGroupReplicas(1),
ConsumerGroupReady,
withChannelTopLevelResourceRef(),
),
},
Key: testKey,
Expand Down Expand Up @@ -1528,6 +1542,7 @@ func TestReconcileKind(t *testing.T) {
ConsumerSubscriber(NewConsumerSpecSubscriber(Subscription1URI)),
ConsumerReply(ConsumerUrlReply(apis.HTTP(Subscription1ReplyURI))),
)),
withChannelTopLevelResourceRef(),
),
},
WantUpdates: []clientgotesting.UpdateActionImpl{
Expand Down Expand Up @@ -2430,3 +2445,13 @@ func httpsURL(name string, namespace string) *apis.URL {
Path: fmt.Sprintf("/%s/%s", namespace, name),
}
}

func withChannelTopLevelResourceRef() ConsumerGroupOption {
return WithTopLevelResourceRef(&corev1.ObjectReference{
APIVersion: messagingv1beta.SchemeGroupVersion.String(),
Kind: "KafkaChannel",
Namespace: ChannelNamespace,
Name: ChannelName,
UID: ChannelUUID,
})
}
35 changes: 34 additions & 1 deletion control-plane/pkg/reconciler/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,22 @@
egress.VReplicas = 1
}

topLevelUserFacingResourceRef, err := r.reconcileTopLevelUserFacingResourceRef(c)
if err != nil {
return nil, fmt.Errorf("failed to reconcile top-level user facing resource reference: %w", err)

Check warning on line 136 in control-plane/pkg/reconciler/consumer/consumer.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/consumer/consumer.go#L136

Added line #L136 was not covered by tests
}
if topLevelUserFacingResourceRef == nil {
topLevelUserFacingResourceRef = userFacingResourceRef
}

resource := &contract.Resource{
Uid: string(c.UID),
Topics: c.Spec.Topics,
BootstrapServers: c.Spec.Configs.Configs["bootstrap.servers"],
Egresses: []*contract.Egress{egress},
Auth: nil, // Auth will be added by reconcileAuth
CloudEventOverrides: reconcileCEOverrides(c),
Reference: userFacingResourceRef,
Reference: topLevelUserFacingResourceRef,
FeatureFlags: &contract.FeatureFlags{
EnableEventTypeAutocreate: feature.FromContext(ctx).IsEnabled(feature.EvenTypeAutoCreate),
},
Expand Down Expand Up @@ -303,6 +311,31 @@
return ref, nil
}

func (r *Reconciler) reconcileTopLevelUserFacingResourceRef(c *kafkainternals.Consumer) (*contract.Reference, error) {

cg, err := r.ConsumerGroupLister.ConsumerGroups(c.GetNamespace()).Get(c.GetConsumerGroup().Name)
if apierrors.IsNotFound(err) {
return nil, nil

Check warning on line 318 in control-plane/pkg/reconciler/consumer/consumer.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/consumer/consumer.go#L318

Added line #L318 was not covered by tests
}
if err != nil {
return nil, fmt.Errorf("failed to get %s: %w", kafkainternals.ConsumerGroupGroupVersionKind.Kind, err)

Check warning on line 321 in control-plane/pkg/reconciler/consumer/consumer.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/consumer/consumer.go#L321

Added line #L321 was not covered by tests
}

userFacingResource := cg.GetTopLevelUserFacingResourceRef()
if userFacingResource == nil {
return nil, nil
}

ref := &contract.Reference{
Uuid: string(userFacingResource.UID),
Namespace: c.GetNamespace(),
Name: userFacingResource.Name,
Kind: userFacingResource.Kind,
GroupVersion: userFacingResource.APIVersion,

Check warning on line 334 in control-plane/pkg/reconciler/consumer/consumer.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/consumer/consumer.go#L329-L334

Added lines #L329 - L334 were not covered by tests
}
return ref, nil

Check warning on line 336 in control-plane/pkg/reconciler/consumer/consumer.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/consumer/consumer.go#L336

Added line #L336 was not covered by tests
}

func reconcileDeliveryOrder(c *kafkainternals.Consumer) contract.DeliveryOrder {
if c.Spec.Delivery == nil {
return contract.DeliveryOrder_UNORDERED
Expand Down
6 changes: 6 additions & 0 deletions control-plane/pkg/reconciler/testing/objects_consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,3 +248,9 @@ func WithConfigmapOwnerRef(ownerref *metav1.OwnerReference) reconcilertesting.Co
cg.ObjectMeta.OwnerReferences = []metav1.OwnerReference{*ownerref}
}
}

func WithTopLevelResourceRef(ref *corev1.ObjectReference) ConsumerGroupOption {
return func(cg *kafkainternals.ConsumerGroup) {
cg.Spec.TopLevelResourceRef = ref
}
}
7 changes: 7 additions & 0 deletions control-plane/pkg/reconciler/trigger/v2/triggerv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,13 @@ func (r *Reconciler) reconcileConsumerGroup(ctx context.Context, broker *eventin
},
},
Spec: internalscg.ConsumerGroupSpec{
TopLevelResourceRef: &corev1.ObjectReference{
APIVersion: eventing.SchemeGroupVersion.String(),
Kind: "Broker",
Name: broker.Name,
Namespace: broker.Namespace,
UID: broker.UID,
},
Template: internalscg.ConsumerTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
Expand Down
Loading
Loading