Skip to content

Commit

Permalink
fix: reference is correctly provisioned for dispatchers in control pl…
Browse files Browse the repository at this point in the history
…ance

Signed-off-by: Calum Murray <cmurray@redhat.com>
  • Loading branch information
Cali0707 committed Aug 22, 2024
1 parent 48f2554 commit 7192915
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 35 deletions.
11 changes: 7 additions & 4 deletions control-plane/pkg/reconciler/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ import (
"context"
"errors"
"fmt"
"knative.dev/eventing/pkg/apis/feature"
"strings"

"knative.dev/eventing/pkg/apis/feature"

"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -293,9 +294,11 @@ func (r *Reconciler) reconcileUserFacingResourceRef(c *kafkainternals.Consumer)

userFacingResource := cg.GetUserFacingResourceRef()
ref := &contract.Reference{
Uuid: string(userFacingResource.UID),
Namespace: c.GetNamespace(),
Name: userFacingResource.Name,
Uuid: string(userFacingResource.UID),
Namespace: c.GetNamespace(),
Name: userFacingResource.Name,
Kind: userFacingResource.Kind,
GroupVersion: userFacingResource.APIVersion,
}
return ref, nil
}
Expand Down
82 changes: 52 additions & 30 deletions control-plane/pkg/reconciler/consumer/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ var (
DefaultEnv = &config.Env{
SystemNamespace: "knative-eventing",
}

SourceKind = "KafkaSource"
)

func TestReconcileKind(t *testing.T) {
Expand Down Expand Up @@ -119,18 +121,22 @@ func TestReconcileKind(t *testing.T) {
KeyType: 0,
VReplicas: 1,
Reference: &contract.Reference{
Uuid: SourceUUID,
Namespace: ConsumerNamespace,
Name: SourceName,
Uuid: SourceUUID,
Namespace: ConsumerNamespace,
Name: SourceName,
Kind: SourceKind,
GroupVersion: kafkasource.SchemeGroupVersion.String(),
},
FeatureFlags: defaultContractFeatureFlags,
}},
Auth: nil,
CloudEventOverrides: nil,
Reference: &contract.Reference{
Uuid: SourceUUID,
Namespace: ConsumerNamespace,
Name: SourceName,
Uuid: SourceUUID,
Namespace: ConsumerNamespace,
Name: SourceName,
Kind: SourceKind,
GroupVersion: kafkasource.SchemeGroupVersion.String(),
},
FeatureFlags: FeatureFlagsETAutocreate(false),
},
Expand Down Expand Up @@ -222,18 +228,22 @@ func TestReconcileKind(t *testing.T) {
KeyType: contract.KeyType_Integer,
VReplicas: 1,
Reference: &contract.Reference{
Uuid: SourceUUID,
Namespace: ConsumerNamespace,
Name: SourceName,
Uuid: SourceUUID,
Namespace: ConsumerNamespace,
Name: SourceName,
Kind: SourceKind,
GroupVersion: kafkasource.SchemeGroupVersion.String(),
},
FeatureFlags: defaultContractFeatureFlags,
}},
Auth: nil,
CloudEventOverrides: nil,
Reference: &contract.Reference{
Uuid: SourceUUID,
Namespace: ConsumerNamespace,
Name: SourceName,
Uuid: SourceUUID,
Namespace: ConsumerNamespace,
Name: SourceName,
Kind: SourceKind,
GroupVersion: kafkasource.SchemeGroupVersion.String(),
},
FeatureFlags: FeatureFlagsETAutocreate(false),
},
Expand Down Expand Up @@ -325,18 +335,22 @@ func TestReconcileKind(t *testing.T) {
KeyType: 0,
VReplicas: 2,
Reference: &contract.Reference{
Uuid: SourceUUID,
Namespace: ConsumerNamespace,
Name: SourceName,
Uuid: SourceUUID,
Namespace: ConsumerNamespace,
Name: SourceName,
Kind: SourceKind,
GroupVersion: kafkasource.SchemeGroupVersion.String(),
},
FeatureFlags: defaultContractFeatureFlags,
}},
Auth: nil,
CloudEventOverrides: nil,
Reference: &contract.Reference{
Uuid: SourceUUID,
Namespace: ConsumerNamespace,
Name: SourceName,
Uuid: SourceUUID,
Namespace: ConsumerNamespace,
Name: SourceName,
Kind: SourceKind,
GroupVersion: kafkasource.SchemeGroupVersion.String(),
},
FeatureFlags: FeatureFlagsETAutocreate(false),
},
Expand Down Expand Up @@ -441,18 +455,22 @@ func TestReconcileKind(t *testing.T) {
DeliveryOrder: contract.DeliveryOrder_ORDERED,
KeyType: 0,
Reference: &contract.Reference{
Uuid: SourceUUID,
Namespace: ConsumerNamespace,
Name: SourceName,
Uuid: SourceUUID,
Namespace: ConsumerNamespace,
Name: SourceName,
Kind: SourceKind,
GroupVersion: kafkasource.SchemeGroupVersion.String(),
},
FeatureFlags: defaultContractFeatureFlags,
}},
Auth: nil,
CloudEventOverrides: nil,
Reference: &contract.Reference{
Uuid: SourceUUID,
Namespace: ConsumerNamespace,
Name: SourceName,
Uuid: SourceUUID,
Namespace: ConsumerNamespace,
Name: SourceName,
Kind: SourceKind,
GroupVersion: kafkasource.SchemeGroupVersion.String(),
},
FeatureFlags: FeatureFlagsETAutocreate(false),
},
Expand Down Expand Up @@ -858,18 +876,22 @@ func TestReconcileKind(t *testing.T) {
KeyType: 0,
VReplicas: 1,
Reference: &contract.Reference{
Uuid: SourceUUID,
Namespace: ConsumerNamespace,
Name: SourceName,
Uuid: SourceUUID,
Namespace: ConsumerNamespace,
Name: SourceName,
Kind: SourceKind,
GroupVersion: kafkasource.SchemeGroupVersion.String(),
},
FeatureFlags: defaultContractFeatureFlags,
}},
Auth: nil,
CloudEventOverrides: nil,
Reference: &contract.Reference{
Uuid: SourceUUID,
Namespace: ConsumerNamespace,
Name: SourceName,
Uuid: SourceUUID,
Namespace: ConsumerNamespace,
Name: SourceName,
Kind: SourceKind,
GroupVersion: kafkasource.SchemeGroupVersion.String(),
},
FeatureFlags: FeatureFlagsETAutocreate(false),
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,15 @@
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import org.apache.commons.codec.binary.Hex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventTypeCreatorImpl implements EventTypeCreator {

private static final Integer DNS1123_SUBDOMAIN_MAX_LENGTH = 253;

private static final Logger logger = LoggerFactory.getLogger(EventTypeCreatorImpl.class);

private final MixedOperation<EventType, KubernetesResourceList<EventType>, Resource<EventType>> eventTypeClient;

private MessageDigest messageDigest;
Expand Down Expand Up @@ -66,8 +70,10 @@ public Future<EventType> create(
CloudEvent event, Lister<EventType> eventTypeLister, DataPlaneContract.Reference ownerReference) {
return this.executor.executeBlocking(() -> {
final var name = this.getName(event, ownerReference);
logger.debug("attempting to autocreate eventtype {} for {}", name, ownerReference);
final var eventType = eventTypeLister.get(name);
if (eventType != null) {
logger.debug("eventtype already exists");
return eventType;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public BaseResponseHandler withEventTypeAutocreate(
EventTypeCreator eventTypeCreator,
Lister<EventType> eventTypeLister,
DataPlaneContract.Reference reference) {
logger.info("cali0707: enabling eventtype autocreate");
this.eventTypeCreator = eventTypeCreator;
this.eventTypeLister = eventTypeLister;
this.reference = reference;
Expand Down Expand Up @@ -101,7 +102,17 @@ public Future<Void> handle(HttpResponse<Buffer> response) {
TracingSpan.decorateCurrentWithEvent(event);

if (this.isEventTypeAutocreateEnabled) {
this.eventTypeCreator.create(event, this.eventTypeLister, this.reference);
return this.doHandleEvent(event).compose((ignored) -> this.eventTypeCreator
.create(event, this.eventTypeLister, this.reference)
.compose(
eventType -> {
logger.debug("successfully created eventtype {}", eventType);
return Future.succeededFuture();
},
cause -> {
logger.warn("failed to create eventtype", cause);
return Future.failedFuture(cause);
}));
}
return this.doHandleEvent(event);
}
Expand Down

0 comments on commit 7192915

Please sign in to comment.