diff --git a/control-plane/pkg/reconciler/consumer/consumer.go b/control-plane/pkg/reconciler/consumer/consumer.go index 276c7010b4..4589758ee2 100644 --- a/control-plane/pkg/reconciler/consumer/consumer.go +++ b/control-plane/pkg/reconciler/consumer/consumer.go @@ -20,9 +20,10 @@ import ( "context" "errors" "fmt" - "knative.dev/eventing/pkg/apis/feature" "strings" + "knative.dev/eventing/pkg/apis/feature" + "go.uber.org/zap" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -293,9 +294,11 @@ func (r *Reconciler) reconcileUserFacingResourceRef(c *kafkainternals.Consumer) userFacingResource := cg.GetUserFacingResourceRef() ref := &contract.Reference{ - Uuid: string(userFacingResource.UID), - Namespace: c.GetNamespace(), - Name: userFacingResource.Name, + Uuid: string(userFacingResource.UID), + Namespace: c.GetNamespace(), + Name: userFacingResource.Name, + Kind: userFacingResource.Kind, + GroupVersion: userFacingResource.APIVersion, } return ref, nil } diff --git a/control-plane/pkg/reconciler/consumer/consumer_test.go b/control-plane/pkg/reconciler/consumer/consumer_test.go index ef208a397e..1fdc3021b2 100644 --- a/control-plane/pkg/reconciler/consumer/consumer_test.go +++ b/control-plane/pkg/reconciler/consumer/consumer_test.go @@ -62,6 +62,8 @@ var ( DefaultEnv = &config.Env{ SystemNamespace: "knative-eventing", } + + SourceKind = "KafkaSource" ) func TestReconcileKind(t *testing.T) { @@ -119,18 +121,22 @@ func TestReconcileKind(t *testing.T) { KeyType: 0, VReplicas: 1, Reference: &contract.Reference{ - Uuid: SourceUUID, - Namespace: ConsumerNamespace, - Name: SourceName, + Uuid: SourceUUID, + Namespace: ConsumerNamespace, + Name: SourceName, + Kind: SourceKind, + GroupVersion: kafkasource.SchemeGroupVersion.String(), }, FeatureFlags: defaultContractFeatureFlags, }}, Auth: nil, CloudEventOverrides: nil, Reference: &contract.Reference{ - Uuid: SourceUUID, - Namespace: ConsumerNamespace, - Name: SourceName, + Uuid: SourceUUID, + Namespace: ConsumerNamespace, + Name: SourceName, + Kind: SourceKind, + GroupVersion: kafkasource.SchemeGroupVersion.String(), }, FeatureFlags: FeatureFlagsETAutocreate(false), }, @@ -222,18 +228,22 @@ func TestReconcileKind(t *testing.T) { KeyType: contract.KeyType_Integer, VReplicas: 1, Reference: &contract.Reference{ - Uuid: SourceUUID, - Namespace: ConsumerNamespace, - Name: SourceName, + Uuid: SourceUUID, + Namespace: ConsumerNamespace, + Name: SourceName, + Kind: SourceKind, + GroupVersion: kafkasource.SchemeGroupVersion.String(), }, FeatureFlags: defaultContractFeatureFlags, }}, Auth: nil, CloudEventOverrides: nil, Reference: &contract.Reference{ - Uuid: SourceUUID, - Namespace: ConsumerNamespace, - Name: SourceName, + Uuid: SourceUUID, + Namespace: ConsumerNamespace, + Name: SourceName, + Kind: SourceKind, + GroupVersion: kafkasource.SchemeGroupVersion.String(), }, FeatureFlags: FeatureFlagsETAutocreate(false), }, @@ -325,18 +335,22 @@ func TestReconcileKind(t *testing.T) { KeyType: 0, VReplicas: 2, Reference: &contract.Reference{ - Uuid: SourceUUID, - Namespace: ConsumerNamespace, - Name: SourceName, + Uuid: SourceUUID, + Namespace: ConsumerNamespace, + Name: SourceName, + Kind: SourceKind, + GroupVersion: kafkasource.SchemeGroupVersion.String(), }, FeatureFlags: defaultContractFeatureFlags, }}, Auth: nil, CloudEventOverrides: nil, Reference: &contract.Reference{ - Uuid: SourceUUID, - Namespace: ConsumerNamespace, - Name: SourceName, + Uuid: SourceUUID, + Namespace: ConsumerNamespace, + Name: SourceName, + Kind: SourceKind, + GroupVersion: kafkasource.SchemeGroupVersion.String(), }, FeatureFlags: FeatureFlagsETAutocreate(false), }, @@ -441,18 +455,22 @@ func TestReconcileKind(t *testing.T) { DeliveryOrder: contract.DeliveryOrder_ORDERED, KeyType: 0, Reference: &contract.Reference{ - Uuid: SourceUUID, - Namespace: ConsumerNamespace, - Name: SourceName, + Uuid: SourceUUID, + Namespace: ConsumerNamespace, + Name: SourceName, + Kind: SourceKind, + GroupVersion: kafkasource.SchemeGroupVersion.String(), }, FeatureFlags: defaultContractFeatureFlags, }}, Auth: nil, CloudEventOverrides: nil, Reference: &contract.Reference{ - Uuid: SourceUUID, - Namespace: ConsumerNamespace, - Name: SourceName, + Uuid: SourceUUID, + Namespace: ConsumerNamespace, + Name: SourceName, + Kind: SourceKind, + GroupVersion: kafkasource.SchemeGroupVersion.String(), }, FeatureFlags: FeatureFlagsETAutocreate(false), }, @@ -858,18 +876,22 @@ func TestReconcileKind(t *testing.T) { KeyType: 0, VReplicas: 1, Reference: &contract.Reference{ - Uuid: SourceUUID, - Namespace: ConsumerNamespace, - Name: SourceName, + Uuid: SourceUUID, + Namespace: ConsumerNamespace, + Name: SourceName, + Kind: SourceKind, + GroupVersion: kafkasource.SchemeGroupVersion.String(), }, FeatureFlags: defaultContractFeatureFlags, }}, Auth: nil, CloudEventOverrides: nil, Reference: &contract.Reference{ - Uuid: SourceUUID, - Namespace: ConsumerNamespace, - Name: SourceName, + Uuid: SourceUUID, + Namespace: ConsumerNamespace, + Name: SourceName, + Kind: SourceKind, + GroupVersion: kafkasource.SchemeGroupVersion.String(), }, FeatureFlags: FeatureFlagsETAutocreate(false), }, diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/eventtype/EventTypeCreatorImpl.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/eventtype/EventTypeCreatorImpl.java index ec38ee52ff..d836c1d5c0 100644 --- a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/eventtype/EventTypeCreatorImpl.java +++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/eventtype/EventTypeCreatorImpl.java @@ -29,11 +29,15 @@ import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import org.apache.commons.codec.binary.Hex; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class EventTypeCreatorImpl implements EventTypeCreator { private static final Integer DNS1123_SUBDOMAIN_MAX_LENGTH = 253; + private static final Logger logger = LoggerFactory.getLogger(EventTypeCreatorImpl.class); + private final MixedOperation, Resource> eventTypeClient; private MessageDigest messageDigest; @@ -66,8 +70,10 @@ public Future create( CloudEvent event, Lister eventTypeLister, DataPlaneContract.Reference ownerReference) { return this.executor.executeBlocking(() -> { final var name = this.getName(event, ownerReference); + logger.debug("attempting to autocreate eventtype {} for {}", name, ownerReference); final var eventType = eventTypeLister.get(name); if (eventType != null) { + logger.debug("eventtype already exists"); return eventType; } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/BaseResponseHandler.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/BaseResponseHandler.java index c8c10992e5..e63ada268a 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/BaseResponseHandler.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/BaseResponseHandler.java @@ -48,6 +48,7 @@ public BaseResponseHandler withEventTypeAutocreate( EventTypeCreator eventTypeCreator, Lister eventTypeLister, DataPlaneContract.Reference reference) { + logger.info("cali0707: enabling eventtype autocreate"); this.eventTypeCreator = eventTypeCreator; this.eventTypeLister = eventTypeLister; this.reference = reference; @@ -101,7 +102,17 @@ public Future handle(HttpResponse response) { TracingSpan.decorateCurrentWithEvent(event); if (this.isEventTypeAutocreateEnabled) { - this.eventTypeCreator.create(event, this.eventTypeLister, this.reference); + return this.doHandleEvent(event).compose((ignored) -> this.eventTypeCreator + .create(event, this.eventTypeLister, this.reference) + .compose( + eventType -> { + logger.debug("successfully created eventtype {}", eventType); + return Future.succeededFuture(); + }, + cause -> { + logger.warn("failed to create eventtype", cause); + return Future.failedFuture(cause); + })); } return this.doHandleEvent(event); }