From 29e22a0d4308a1de861271a0fba1ec48fe737555 Mon Sep 17 00:00:00 2001 From: Mukundan Sundararajan Date: Thu, 11 Nov 2021 09:52:03 +0530 Subject: [PATCH] rewrite eventhubs component Signed-off-by: Mukundan Sundararajan --- authentication/azure/auth.go | 6 + pubsub/azure/eventhubs/eventhubs.go | 489 ++++++++++++++++++++--- pubsub/azure/eventhubs/eventhubs_test.go | 269 ++++++++++++- tests/config/pubsub/tests.yml | 10 +- 4 files changed, 698 insertions(+), 76 deletions(-) diff --git a/authentication/azure/auth.go b/authentication/azure/auth.go index c865ebcafc..b660e56c22 100644 --- a/authentication/azure/auth.go +++ b/authentication/azure/auth.go @@ -46,6 +46,12 @@ func NewEnvironmentSettings(resourceName string, values map[string]string) (Envi es.Resource = azureEnv.ResourceIdentifiers.CosmosDB case "servicebus": es.Resource = azureEnv.ResourceIdentifiers.ServiceBus + case "eventhubs": + // Azure EventHubs (data plane) + // For documentation https://docs.microsoft.com/en-us/azure/event-hubs/authorize-access-azure-active-directory#overview + // The resource name to request a token is https://eventhubs.azure.net/, and it's the same for all clouds/tenants. + // Kafka connection does not factor in here. + es.Resource = "https://eventhubs.azure.net" default: return es, errors.New("invalid resource name: " + resourceName) } diff --git a/pubsub/azure/eventhubs/eventhubs.go b/pubsub/azure/eventhubs/eventhubs.go index c45d68d6e4..172f2b6c46 100644 --- a/pubsub/azure/eventhubs/eventhubs.go +++ b/pubsub/azure/eventhubs/eventhubs.go @@ -7,38 +7,48 @@ package eventhubs import ( "context" + "encoding/json" "errors" "fmt" "strconv" + "strings" "time" + "github.com/Azure/azure-amqp-common-go/v3/aad" + "github.com/Azure/azure-amqp-common-go/v3/conn" eventhub "github.com/Azure/azure-event-hubs-go/v3" "github.com/Azure/azure-event-hubs-go/v3/eph" "github.com/Azure/azure-event-hubs-go/v3/storage" + mgmt "github.com/Azure/azure-sdk-for-go/services/eventhub/mgmt/2017-04-01/eventhub" "github.com/Azure/azure-storage-blob-go/azblob" "github.com/Azure/go-autorest/autorest/azure" + azauth "github.com/dapr/components-contrib/authentication/azure" "github.com/dapr/components-contrib/pubsub" "github.com/dapr/kit/logger" "github.com/dapr/kit/retry" ) const ( - // metadata. - connectionString = "connectionString" - consumerID = "consumerID" // passed by dapr runtime - // required by subscriber. - storageAccountName = "storageAccountName" - storageAccountKey = "storageAccountKey" - storageContainerName = "storageContainerName" + // connection string entity path key. + entityPathKey = "EntityPath" + // metadata partitionKey key. + partitionKeyMetadataKey = "partitionKey" // errors. - missingConnectionStringErrorMsg = "error: connectionString is a required attribute" - missingStorageAccountNameErrorMsg = "error: storageAccountName is a required attribute" - missingStorageAccountKeyErrorMsg = "error: storageAccountKey is a required attribute" - missingStorageContainerNameErrorMsg = "error: storageContainerName is a required attribute" - missingConsumerIDErrorMsg = "error: missing consumerID attribute" + hubManagerCreationErrorMsg = "error: creating eventHub manager client" + invalidConnectionStringErrorMsg = "error: connectionString is invalid" + missingConnectionStringNamespaceErrorMsg = "error: connectionString or eventHubNamespace is required" + missingStorageAccountNameErrorMsg = "error: storageAccountName is a required attribute for subscribe" + missingStorageAccountKeyErrorMsg = "error: storageAccountKey is a required attribute for subscribe" + missingStorageContainerNameErrorMsg = "error: storageContainerName is a required attribute for subscribe" + missingConsumerIDErrorMsg = "error: missing consumerID attribute for subscribe" + bothConnectionStringNamespaceErrorMsg = "error: both connectionString and eventHubNamespace are given, only one should be given" + missingResourceGroupNameMsg = "error: missing resourceGroupName attribute required for entityManagement" + missingSubscriptionIDMsg = "error: missing subscriptionID attribute required for entityManagement" + entityManagementConnectionStrMsg = "error: entity management support is not available with connectionString" + differentTopicConnectionStringErrorTmpl = "error: specified topic %s does not match the event hub name in the provided connectionString" // Event Hubs SystemProperties names for metadata passthrough. sysPropSequenceNumber = "x-opt-sequence-number" @@ -52,6 +62,18 @@ const ( sysPropIotHubConnectionModuleID = "iothub-connection-module-id" sysPropIotHubEnqueuedTime = "iothub-enqueuedtime" sysPropMessageID = "message-id" + + defaultMessageRetentionInDays = 1 + defaultPartitionCount = 1 + + resourceCheckMaxRetry = 5 + resourceCheckMaxRetryInterval time.Duration = 5 * time.Minute + resourceCreationTimeout time.Duration = 15 * time.Second + resourceGetTimeout time.Duration = 5 * time.Second + + // See https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-quotas for numbers. + maxMessageRetention = int32(90) + maxPartitionCount = int32(1024) ) func subscribeHandler(ctx context.Context, topic string, e *eventhub.Event, handler pubsub.Handler) error { @@ -98,21 +120,32 @@ func subscribeHandler(ctx context.Context, topic string, e *eventhub.Event, hand // AzureEventHubs allows sending/receiving Azure Event Hubs events. type AzureEventHubs struct { - hub *eventhub.Hub - metadata azureEventHubsMetadata - - logger logger.Logger - ctx context.Context - cancel context.CancelFunc - backOffConfig retry.Config + metadata *azureEventHubsMetadata + logger logger.Logger + ctx context.Context + cancel context.CancelFunc + backOffConfig retry.Config + hubClients map[string]*eventhub.Hub + eventProcessors map[string]*eph.EventProcessorHost + hubManager *eventhub.HubManager + eventHubSettings azauth.EnvironmentSettings + managementSettings azauth.EnvironmentSettings + cgClient *mgmt.ConsumerGroupsClient + tokenProvider *aad.TokenProvider } type azureEventHubsMetadata struct { - connectionString string - consumerGroup string - storageAccountName string - storageAccountKey string - storageContainerName string + ConnectionString string `json:"connectionString,omitempty"` + EventHubNamespace string `json:"eventHubNamespace,omitempty"` + ConsumerGroup string `json:"consumerID"` + StorageAccountName string `json:"storageAccountName,omitempty"` + StorageAccountKey string `json:"storageAccountKey,omitempty"` + StorageContainerName string `json:"storageContainerName,omitempty"` + EnableEnitityManagement bool `json:"enableEntityManagement,omitempty,string"` + MessageRetentionInDays int32 `json:"messageRetentionInDays,omitempty,string"` + PartitionCount int32 `json:"partitionCount,omitempty,string"` + SubscriptionID string `json:"subscriptionID,omitempty"` + ResourceGroupName string `json:"resourceGroupName,omitempty"` } // NewAzureEventHubs returns a new Azure Event hubs instance. @@ -120,40 +153,287 @@ func NewAzureEventHubs(logger logger.Logger) *AzureEventHubs { return &AzureEventHubs{logger: logger} } -func parseEventHubsMetadata(meta pubsub.Metadata) (azureEventHubsMetadata, error) { +func parseEventHubsMetadata(meta pubsub.Metadata) (*azureEventHubsMetadata, error) { + b, err := json.Marshal(meta.Properties) + if err != nil { + return nil, err + } + m := azureEventHubsMetadata{} + err = json.Unmarshal(b, &m) + if err != nil { + return nil, err + } - if val, ok := meta.Properties[connectionString]; ok && val != "" { - m.connectionString = val - } else { - return m, errors.New(missingConnectionStringErrorMsg) + if m.ConnectionString == "" && m.EventHubNamespace == "" { + return &m, errors.New(missingConnectionStringNamespaceErrorMsg) } - if val, ok := meta.Properties[storageAccountName]; ok && val != "" { - m.storageAccountName = val - } else { - return m, errors.New(missingStorageAccountNameErrorMsg) + if m.ConnectionString != "" && m.EventHubNamespace != "" { + return &m, errors.New(bothConnectionStringNamespaceErrorMsg) } - if val, ok := meta.Properties[storageAccountKey]; ok && val != "" { - m.storageAccountKey = val - } else { - return m, errors.New(missingStorageAccountKeyErrorMsg) + return &m, nil +} + +func validateAndGetHubName(connectionString string) (string, error) { + parsed, err := conn.ParsedConnectionFromStr(connectionString) + if err != nil { + return "", err } + return parsed.HubName, nil +} - if val, ok := meta.Properties[storageContainerName]; ok && val != "" { - m.storageContainerName = val - } else { - return m, errors.New(missingStorageContainerNameErrorMsg) +func (aeh *AzureEventHubs) ensureEventHub(hubName string) error { + if aeh.hubManager == nil { + aeh.logger.Errorf("hubManager client not initialized properly.") + return fmt.Errorf("hubManager client not initialized properly") + } + entity, err := aeh.getHubEntity(hubName) + if err != nil { + return err + } + if entity == nil { + if err := aeh.createHubEntity(hubName); err != nil { + return err + } } + return nil +} + +func (aeh *AzureEventHubs) ensureSubscription(hubName string) error { + err := aeh.ensureEventHub(hubName) + if err != nil { + return err + } + _, err = aeh.getConsumerGroupsClient() + if err != nil { + return err + } + return aeh.createConsumerGroup(hubName) +} + +func (aeh *AzureEventHubs) getConsumerGroupsClient() (*mgmt.ConsumerGroupsClient, error) { + if aeh.cgClient != nil { + return aeh.cgClient, nil + } + client := mgmt.NewConsumerGroupsClientWithBaseURI(aeh.managementSettings.AzureEnvironment.ResourceManagerEndpoint, + aeh.metadata.SubscriptionID) + a, err := aeh.managementSettings.GetAuthorizer() + if err != nil { + return nil, err + } + client.Authorizer = a + aeh.cgClient = &client + return aeh.cgClient, nil +} + +func (aeh *AzureEventHubs) createConsumerGroup(hubName string) error { + create := false + backOffConfig := retry.DefaultConfig() + backOffConfig.Policy = retry.PolicyExponential + backOffConfig.MaxInterval = resourceCheckMaxRetryInterval + backOffConfig.MaxRetries = resourceCheckMaxRetry + + b := backOffConfig.NewBackOffWithContext(aeh.ctx) + + err := retry.NotifyRecover(func() error { + c, err := aeh.shouldCreateConsumerGroup(hubName) + if err == nil { + create = c + return nil + } + return err + }, b, func(_ error, _ time.Duration) { + aeh.logger.Errorf("Error checking for consumer group for EventHub : %s. Retrying...", hubName) + }, func() { + aeh.logger.Warnf("Successfully checked for consumer group in EventHub %s after it previously failed.", hubName) + }) + if err != nil { + return err + } + if create { + ctx, cancel := context.WithTimeout(aeh.ctx, resourceCreationTimeout) + defer cancel() + _, err = aeh.cgClient.CreateOrUpdate(ctx, aeh.metadata.ResourceGroupName, aeh.metadata.EventHubNamespace, hubName, aeh.metadata.ConsumerGroup, mgmt.ConsumerGroup{}) + if err != nil { + return err + } + } + return nil +} + +func (aeh *AzureEventHubs) shouldCreateConsumerGroup(hubName string) (bool, error) { + ctx, cancel := context.WithTimeout(aeh.ctx, resourceGetTimeout) + defer cancel() + g, err := aeh.cgClient.Get(ctx, aeh.metadata.ResourceGroupName, aeh.metadata.EventHubNamespace, hubName, aeh.metadata.ConsumerGroup) + if err != nil { + if g.HasHTTPStatus(404) { + return true, nil + } + return false, err + } + if *g.Name == aeh.metadata.ConsumerGroup { + aeh.logger.Infof("consumer group %s exists for the requested topic/eventHub %s", aeh.metadata.ConsumerGroup, hubName) + } + return false, nil +} + +func (aeh *AzureEventHubs) getHubEntity(hubName string) (*eventhub.HubEntity, error) { + ctx, cancel := context.WithTimeout(aeh.ctx, resourceGetTimeout) + defer cancel() + return aeh.hubManager.Get(ctx, hubName) +} + +func (aeh *AzureEventHubs) createHubEntity(hubName string) error { + ctx, cancel := context.WithTimeout(aeh.ctx, resourceCreationTimeout) + defer cancel() + _, err := aeh.hubManager.Put(ctx, hubName, + eventhub.HubWithMessageRetentionInDays(aeh.metadata.MessageRetentionInDays), + eventhub.HubWithPartitionCount(aeh.metadata.PartitionCount)) + if err != nil { + aeh.logger.Errorf("error creating event hub %s: %s", hubName, err) + return fmt.Errorf("error creating event hub %s: %s", hubName, err) + } + return nil +} - if val, ok := meta.Properties[consumerID]; ok && val != "" { - m.consumerGroup = val +func (aeh *AzureEventHubs) ensurePublisherClient(hubName string) error { + if aeh.metadata.EnableEnitityManagement { + if err := aeh.ensureEventHub(hubName); err != nil { + return err + } + } + userAgent := "dapr-" + logger.DaprVersion + if aeh.metadata.ConnectionString != "" { + // Connect with connection string. + newConnectionString, err := aeh.constructConnectionStringFromTopic(hubName) + if err != nil { + return err + } + + hub, err := eventhub.NewHubFromConnectionString(newConnectionString, + eventhub.HubWithUserAgent(userAgent)) + if err != nil { + aeh.logger.Debugf("unable to connect to azure event hubs: %v", err) + return fmt.Errorf("unable to connect to azure event hubs: %v", err) + } + aeh.hubClients[hubName] = hub } else { - return m, errors.New(missingConsumerIDErrorMsg) + if hubName == "" { + return errors.New("error: missing topic/hubName attribute with AAD connection") + } + + hub, err := eventhub.NewHub(aeh.metadata.EventHubNamespace, hubName, aeh.tokenProvider, eventhub.HubWithUserAgent(userAgent)) + if err != nil { + return fmt.Errorf("unable to connect to azure event hubs: %v", err) + } + aeh.hubClients[hubName] = hub + } + + return nil +} + +func (aeh *AzureEventHubs) ensureSubscriberClient(topic string, leaserCheckpointer *storage.LeaserCheckpointer) (*eph.EventProcessorHost, error) { + // connectionString given. + if aeh.metadata.ConnectionString != "" { + hubName, err := validateAndGetHubName(aeh.metadata.ConnectionString) + if err != nil { + return nil, fmt.Errorf("error parsing connection string %s", err) + } + if hubName != "" && hubName != topic { + return nil, fmt.Errorf("error: component cannot subscribe to requested topic %s with the given connectionString", topic) + } + if hubName == "" { + aeh.logger.Debugf("eventhub namespace connection string given. using topic as event hub entity path") + } + connectionString, err := aeh.constructConnectionStringFromTopic(topic) + if err != nil { + return nil, err + } + processor, err := eph.NewFromConnectionString(aeh.ctx, connectionString, leaserCheckpointer, leaserCheckpointer, eph.WithNoBanner(), eph.WithConsumerGroup(aeh.metadata.ConsumerGroup)) + if err != nil { + return nil, err + } + aeh.logger.Debugf("processor initialized via connection string for topic %s", topic) + return processor, nil + } + // AAD connection. + processor, err := eph.New(aeh.ctx, aeh.metadata.EventHubNamespace, topic, aeh.tokenProvider, leaserCheckpointer, leaserCheckpointer, eph.WithNoBanner(), eph.WithConsumerGroup(aeh.metadata.ConsumerGroup)) + if err != nil { + return nil, err + } + aeh.logger.Debugf("processor initialized via AAD for topic %s", topic) + + return processor, nil +} + +func (aeh *AzureEventHubs) createHubManager() error { + // Only AAD based authentication supported. + hubManager, err := eventhub.NewHubManagerFromAzureEnvironment(aeh.metadata.EventHubNamespace, aeh.tokenProvider, *aeh.eventHubSettings.AzureEnvironment) + if err != nil { + return fmt.Errorf("%s %s", hubManagerCreationErrorMsg, err) + } + aeh.hubManager = hubManager + + return nil +} + +func (aeh *AzureEventHubs) constructConnectionStringFromTopic(requestedTopic string) (string, error) { + hubName, err := validateAndGetHubName(aeh.metadata.ConnectionString) + if err != nil { + return "", err } + if hubName != "" && hubName == requestedTopic { + return aeh.metadata.ConnectionString, nil + } else if hubName != "" { + return "", fmt.Errorf(differentTopicConnectionStringErrorTmpl, requestedTopic) + } + return aeh.metadata.ConnectionString + ";" + entityPathKey + "=" + requestedTopic, nil +} - return m, nil +func (aeh *AzureEventHubs) validateEnitityManagementMetadata() error { + if aeh.metadata.MessageRetentionInDays <= 0 || aeh.metadata.MessageRetentionInDays > maxMessageRetention { + aeh.logger.Warnf("invalid/no message retention time period is given with entity management enabled, default value of %d is used", defaultMessageRetentionInDays) + aeh.metadata.MessageRetentionInDays = defaultMessageRetentionInDays + } + if aeh.metadata.PartitionCount <= 0 || aeh.metadata.PartitionCount > maxPartitionCount { + aeh.logger.Warnf("invalid/no partition count is given with entity management enabled, default value of %d is used", defaultPartitionCount) + aeh.metadata.PartitionCount = defaultPartitionCount + } + if aeh.metadata.ResourceGroupName == "" { + return errors.New(missingResourceGroupNameMsg) + } + if aeh.metadata.SubscriptionID == "" { + return errors.New(missingSubscriptionIDMsg) + } + return nil +} + +func (aeh *AzureEventHubs) validateSubscriptionAttributes() error { + m := *aeh.metadata + + if m.StorageAccountName == "" { + return errors.New(missingStorageAccountNameErrorMsg) + } + + if m.StorageAccountKey == "" { + return errors.New(missingStorageAccountKeyErrorMsg) + } + + if m.StorageContainerName == "" { + return errors.New(missingStorageContainerNameErrorMsg) + } + + if m.ConsumerGroup == "" { + return errors.New(missingConsumerIDErrorMsg) + } + return nil +} + +func (aeh *AzureEventHubs) getStoragePrefixString(topic string) string { + // empty string in the end of slice to have a suffix "-". + return strings.Join([]string{"dapr", topic, aeh.metadata.ConsumerGroup, ""}, "-") } // Init connects to Azure Event Hubs. @@ -162,15 +442,61 @@ func (aeh *AzureEventHubs) Init(metadata pubsub.Metadata) error { if err != nil { return err } - userAgent := "dapr-" + logger.DaprVersion + aeh.metadata = m - hub, err := eventhub.NewHubFromConnectionString(aeh.metadata.connectionString, - eventhub.HubWithUserAgent(userAgent)) - if err != nil { - return fmt.Errorf("unable to connect to azure event hubs: %v", err) + aeh.eventProcessors = map[string]*eph.EventProcessorHost{} + aeh.hubClients = map[string]*eventhub.Hub{} + + if aeh.metadata.ConnectionString != "" { + // Validate connectionString. + hubName, err := validateAndGetHubName(aeh.metadata.ConnectionString) + if err != nil { + return errors.New(invalidConnectionStringErrorMsg) + } + if hubName != "" { + aeh.logger.Infof("connectionString provided is specific to event hub %q. Publishing or subscribing to a topic that does not match this event hub will fail when attempted.", hubName) + } else { + aeh.logger.Infof("hubName not given in connectionString. connection established on first publish/subscribe") + aeh.logger.Debugf("req.Topic field in incoming requests honored") + } + if aeh.metadata.EnableEnitityManagement { + // See https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-management-libraries + return errors.New(entityManagementConnectionStrMsg) + } + } else { + // Connect via AAD. + settings, sErr := azauth.NewEnvironmentSettings("eventhubs", metadata.Properties) + if sErr != nil { + return sErr + } + aeh.eventHubSettings = settings + tokenProvider, err := aeh.eventHubSettings.GetAADTokenProvider() + if err != nil { + return fmt.Errorf("%s %s", hubManagerCreationErrorMsg, err) + } + aeh.tokenProvider = tokenProvider + aeh.logger.Info("connecting to Azure EventHubs via AAD. connection established on first publish/subscribe") + aeh.logger.Debugf("req.Topic field in incoming requests honored") + + if aeh.metadata.EnableEnitityManagement { + if err := aeh.validateEnitityManagementMetadata(); err != nil { + return err + } + + // Create hubManager for eventHub management with AAD. + if err := aeh.createHubManager(); err != nil { + return err + } + + // Get Azure Management plane settings for creating consumer groups using event hubs management client. + settings, err := azauth.NewEnvironmentSettings("azure", metadata.Properties) + if err != nil { + return err + } + aeh.managementSettings = settings + } } - aeh.hub = hub aeh.ctx, aeh.cancel = context.WithCancel(context.Background()) // Default retry configuration is used if no backOff properties are set. @@ -186,7 +512,17 @@ func (aeh *AzureEventHubs) Init(metadata pubsub.Metadata) error { // Publish sends data to Azure Event Hubs. func (aeh *AzureEventHubs) Publish(req *pubsub.PublishRequest) error { - err := aeh.hub.Send(aeh.ctx, &eventhub.Event{Data: req.Data}) + if _, ok := aeh.hubClients[req.Topic]; !ok { + if err := aeh.ensurePublisherClient(req.Topic); err != nil { + return fmt.Errorf("error on establishing hub connection: %s", err) + } + } + event := &eventhub.Event{Data: req.Data} + val, ok := req.Metadata[partitionKeyMetadataKey] + if ok { + event.PartitionKey = &val + } + err := aeh.hubClients[req.Topic].Send(aeh.ctx, event) if err != nil { return fmt.Errorf("error from publish: %s", err) } @@ -196,30 +532,43 @@ func (aeh *AzureEventHubs) Publish(req *pubsub.PublishRequest) error { // Subscribe receives data from Azure Event Hubs. func (aeh *AzureEventHubs) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) error { - cred, err := azblob.NewSharedKeyCredential(aeh.metadata.storageAccountName, aeh.metadata.storageAccountKey) + err := aeh.validateSubscriptionAttributes() + if err != nil { + return fmt.Errorf("error : error on subscribe %s", err) + } + if aeh.metadata.EnableEnitityManagement { + if err = aeh.ensureSubscription(req.Topic); err != nil { + return err + } + } + cred, err := azblob.NewSharedKeyCredential(aeh.metadata.StorageAccountName, aeh.metadata.StorageAccountKey) if err != nil { return err } - leaserCheckpointer, err := storage.NewStorageLeaserCheckpointer(cred, aeh.metadata.storageAccountName, aeh.metadata.storageContainerName, azure.PublicCloud) + // Set topic name, consumerID prefix for partition checkpoint lease blob path. + // This is needed to support multiple consumers for the topic using the same storage container. + leaserPrefixOpt := storage.WithPrefixInBlobPath(aeh.getStoragePrefixString(req.Topic)) + leaserCheckpointer, err := storage.NewStorageLeaserCheckpointer(cred, aeh.metadata.StorageAccountName, aeh.metadata.StorageContainerName, azure.PublicCloud, leaserPrefixOpt) if err != nil { return err } - processor, err := eph.NewFromConnectionString(aeh.ctx, aeh.metadata.connectionString, leaserCheckpointer, leaserCheckpointer, eph.WithNoBanner(), eph.WithConsumerGroup(aeh.metadata.consumerGroup)) + processor, err := aeh.ensureSubscriberClient(req.Topic, leaserCheckpointer) if err != nil { return err } + aeh.logger.Debugf("registering handler for topic %s", req.Topic) _, err = processor.RegisterHandler(aeh.ctx, - func(c context.Context, e *eventhub.Event) error { + func(_ context.Context, e *eventhub.Event) error { b := aeh.backOffConfig.NewBackOffWithContext(aeh.ctx) return retry.NotifyRecover(func() error { aeh.logger.Debugf("Processing EventHubs event %s/%s", req.Topic, e.ID) return subscribeHandler(aeh.ctx, req.Topic, e, handler) - }, b, func(err error, d time.Duration) { + }, b, func(_ error, _ time.Duration) { aeh.logger.Errorf("Error processing EventHubs event: %s/%s. Retrying...", req.Topic, e.ID) }, func() { aeh.logger.Errorf("Successfully processed EventHubs event after it previously failed: %s/%s", req.Topic, e.ID) @@ -233,14 +582,34 @@ func (aeh *AzureEventHubs) Subscribe(req pubsub.SubscribeRequest, handler pubsub if err != nil { return err } + aeh.eventProcessors[req.Topic] = processor return nil } func (aeh *AzureEventHubs) Close() error { - aeh.cancel() - - return aeh.hub.Close(aeh.ctx) + defer aeh.cancel() + flag := false + for topic, client := range aeh.hubClients { + err := client.Close(aeh.ctx) + if err != nil { + flag = true + aeh.logger.Warnf("error closing publish client properly for topic/eventHub %s: %s", topic, err) + } + } + aeh.hubClients = map[string]*eventhub.Hub{} + for topic, client := range aeh.eventProcessors { + err := client.Close(aeh.ctx) + if err != nil { + flag = true + aeh.logger.Warnf("error closing event processor host client properly for topic/eventHub %s: %s", topic, err) + } + } + aeh.eventProcessors = map[string]*eph.EventProcessorHost{} + if flag { + return errors.New("error closing event hub clients in a proper fashion") + } + return nil } func (aeh *AzureEventHubs) Features() []pubsub.Feature { diff --git a/pubsub/azure/eventhubs/eventhubs_test.go b/pubsub/azure/eventhubs/eventhubs_test.go index d9b8c8eb1c..43c08592c2 100644 --- a/pubsub/azure/eventhubs/eventhubs_test.go +++ b/pubsub/azure/eventhubs/eventhubs_test.go @@ -6,26 +6,78 @@ package eventhubs import ( + "fmt" "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/dapr/components-contrib/pubsub" + "github.com/dapr/kit/logger" ) +var testLogger = logger.NewLogger("test") + func TestParseEventHubsMetadata(t *testing.T) { + t.Run("test valid connectionString configuration", func(t *testing.T) { + props := map[string]string{"connectionString": "fake"} + + metadata := pubsub.Metadata{Properties: props} + m, err := parseEventHubsMetadata(metadata) + + assert.NoError(t, err) + assert.Equal(t, "fake", m.ConnectionString) + }) + + t.Run("test namespace given", func(t *testing.T) { + props := map[string]string{"eventHubNamespace": "fake"} + + metadata := pubsub.Metadata{Properties: props} + m, err := parseEventHubsMetadata(metadata) + + assert.NoError(t, err) + assert.Equal(t, "fake", m.EventHubNamespace) + }) + + t.Run("test both connectionString and eventHubNamespace given", func(t *testing.T) { + props := map[string]string{"connectionString": "fake", "eventHubNamespace": "fake"} + + metadata := pubsub.Metadata{Properties: props} + _, err := parseEventHubsMetadata(metadata) + + assert.Error(t, err) + assert.Equal(t, bothConnectionStringNamespaceErrorMsg, err.Error()) + }) + + t.Run("test missing metadata", func(t *testing.T) { + props := map[string]string{} + + metadata := pubsub.Metadata{Properties: props} + _, err := parseEventHubsMetadata(metadata) + + assert.Error(t, err) + assert.Equal(t, missingConnectionStringNamespaceErrorMsg, err.Error()) + }) +} + +func TestValidateSubscriptionAttributes(t *testing.T) { t.Run("test valid configuration", func(t *testing.T) { - props := map[string]string{"connectionString": "fake", "consumerID": "mygroup", "storageAccountName": "account", "storageAccountKey": "key", "storageContainerName": "container"} + props := map[string]string{"connectionString": "fake", "consumerID": "fake", "storageAccountName": "account", "storageAccountKey": "key", "storageContainerName": "container"} metadata := pubsub.Metadata{Properties: props} m, err := parseEventHubsMetadata(metadata) assert.NoError(t, err) - assert.Equal(t, m.connectionString, "fake") - assert.Equal(t, m.storageAccountName, "account") - assert.Equal(t, m.storageAccountKey, "key") - assert.Equal(t, m.storageContainerName, "container") - assert.Equal(t, m.consumerGroup, "mygroup") + aeh := &AzureEventHubs{logger: testLogger, metadata: m} + assert.Equal(t, m.ConnectionString, "fake") + assert.Equal(t, m.StorageAccountName, "account") + assert.Equal(t, m.StorageAccountKey, "key") + assert.Equal(t, m.StorageContainerName, "container") + assert.Equal(t, m.ConsumerGroup, "fake") + + err = aeh.validateSubscriptionAttributes() + + assert.NoError(t, err) }) type invalidConfigTestCase struct { @@ -39,11 +91,6 @@ func TestParseEventHubsMetadata(t *testing.T) { map[string]string{"connectionString": "fake", "storageAccountName": "account", "storageAccountKey": "key", "storageContainerName": "container"}, missingConsumerIDErrorMsg, }, - { - "missing connectionString", - map[string]string{"consumerID": "fake", "storageAccountName": "account", "storageAccountKey": "key", "storageContainerName": "container"}, - missingConnectionStringErrorMsg, - }, { "missing storageAccountName", map[string]string{"consumerID": "fake", "connectionString": "fake", "storageAccountKey": "key", "storageContainerName": "container"}, @@ -64,9 +111,205 @@ func TestParseEventHubsMetadata(t *testing.T) { for _, c := range invalidConfigTestCases { t.Run(c.name, func(t *testing.T) { metadata := pubsub.Metadata{Properties: c.config} - _, err := parseEventHubsMetadata(metadata) + m, err := parseEventHubsMetadata(metadata) + aeh := &AzureEventHubs{logger: testLogger, metadata: m} + require.NoError(t, err) + err = aeh.validateSubscriptionAttributes() assert.Error(t, err) - assert.Equal(t, err.Error(), c.errMsg) + assert.Equal(t, c.errMsg, err.Error()) }) } } + +func TestValidateEnitityManagementMetadata(t *testing.T) { + t.Run("test valid configuration", func(t *testing.T) { + props := map[string]string{"eventHubNamespace": "fake", "messageRetentionInDays": "2", "partitionCount": "3", "resourceGroupName": "rg", "subscriptionID": "id"} + + metadata := pubsub.Metadata{Properties: props} + m, err := parseEventHubsMetadata(metadata) + + require.NoError(t, err) + aeh := &AzureEventHubs{logger: testLogger, metadata: m} + assert.Equal(t, "fake", m.EventHubNamespace) + assert.Equal(t, int32(2), m.MessageRetentionInDays) + assert.Equal(t, int32(3), m.PartitionCount) + + err = aeh.validateEnitityManagementMetadata() + assert.NoError(t, err) + assert.Equal(t, int32(2), m.MessageRetentionInDays) + assert.Equal(t, int32(3), m.PartitionCount) + assert.Equal(t, "rg", m.ResourceGroupName) + assert.Equal(t, "id", m.SubscriptionID) + }) + + t.Run("test valid configuration", func(t *testing.T) { + props := map[string]string{"eventHubNamespace": "fake", "resourceGroupName": "rg", "subscriptionID": "id"} + + metadata := pubsub.Metadata{Properties: props} + m, err := parseEventHubsMetadata(metadata) + + require.NoError(t, err) + aeh := &AzureEventHubs{logger: testLogger, metadata: m} + assert.Equal(t, "fake", m.EventHubNamespace) + assert.Equal(t, int32(0), m.MessageRetentionInDays) + assert.Equal(t, int32(0), m.PartitionCount) + + err = aeh.validateEnitityManagementMetadata() + assert.NoError(t, err) + assert.Equal(t, int32(1), m.MessageRetentionInDays) + assert.Equal(t, int32(1), m.PartitionCount) + assert.Equal(t, "rg", m.ResourceGroupName) + assert.Equal(t, "id", m.SubscriptionID) + }) + + type invalidConfigTestCase struct { + name string + config map[string]string + messageRetentionInDays int32 + partitionCount int32 + errMsg string + } + invalidConfigTestCases := []invalidConfigTestCase{ + { + "negative message rentention days", + map[string]string{"eventHubNamespace": "fake", "messageRetentionInDays": "-2", "resourceGroupName": "rg", "subscriptionID": "id"}, + defaultMessageRetentionInDays, + defaultPartitionCount, + "", + }, + { + "more than max message rentention days", + map[string]string{"eventHubNamespace": "fake", "messageRetentionInDays": "91", "resourceGroupName": "rg", "subscriptionID": "id"}, + defaultMessageRetentionInDays, + defaultPartitionCount, + "", + }, + { + "negative partition count", + map[string]string{"eventHubNamespace": "fake", "partitionCount": "-2", "resourceGroupName": "rg", "subscriptionID": "id"}, + defaultMessageRetentionInDays, + defaultPartitionCount, + "", + }, + { + "more than max partition count", + map[string]string{"eventHubNamespace": "fake", "partitionCount": "1030", "resourceGroupName": "rg", "subscriptionID": "id"}, + defaultMessageRetentionInDays, + defaultPartitionCount, + "", + }, + { + "missingResourceGroupName", + map[string]string{"eventHubNamespace": "fake", "subscriptionID": "id"}, + defaultMessageRetentionInDays, + defaultPartitionCount, + missingResourceGroupNameMsg, + }, + { + "missingSubscriptionID", + map[string]string{"eventHubNamespace": "fake", "resourceGroupName": "id"}, + defaultMessageRetentionInDays, + defaultPartitionCount, + missingSubscriptionIDMsg, + }, + } + + for _, c := range invalidConfigTestCases { + t.Run(c.name, func(t *testing.T) { + metadata := pubsub.Metadata{Properties: c.config} + m, err := parseEventHubsMetadata(metadata) + aeh := &AzureEventHubs{logger: testLogger, metadata: m} + require.NoError(t, err) + err = aeh.validateEnitityManagementMetadata() + if c.errMsg != "" { + assert.Error(t, err) + assert.Equal(t, c.errMsg, err.Error()) + } else { + assert.NoError(t, err) + } + assert.Equal(t, c.messageRetentionInDays, aeh.metadata.MessageRetentionInDays) + assert.Equal(t, c.partitionCount, aeh.metadata.PartitionCount) + }) + } +} + +func TestGetStoragePrefixString(t *testing.T) { + props := map[string]string{"connectionString": "fake", "consumerID": "test"} + + metadata := pubsub.Metadata{Properties: props} + m, err := parseEventHubsMetadata(metadata) + + require.NoError(t, err) + + aeh := &AzureEventHubs{logger: testLogger, metadata: m} + + actual := aeh.getStoragePrefixString("topic") + + assert.Equal(t, "dapr-topic-test-", actual) +} + +func TestValidateAndGetHubName(t *testing.T) { + t.Run("valid connectionString with hub name", func(t *testing.T) { + connectionString := "Endpoint=sb://fake.servicebus.windows.net/;SharedAccessKeyName=fakeKey;SharedAccessKey=key;EntityPath=testHub" + h, err := validateAndGetHubName(connectionString) + assert.NoError(t, err) + assert.Equal(t, "testHub", h) + }) + + t.Run("valid connectionString without hub name", func(t *testing.T) { + connectionString := "Endpoint=sb://fake.servicebus.windows.net/;SharedAccessKeyName=fakeKey;SharedAccessKey=key" + h, err := validateAndGetHubName(connectionString) + assert.NoError(t, err) + assert.Empty(t, h) + }) + + t.Run("invalid connectionString ", func(t *testing.T) { + connectionString := "Endpoint=sb://fake.servicebus.windows.net/;ShareKeyName=fakeKey;SharedAccessKey=key" + _, err := validateAndGetHubName(connectionString) + assert.Error(t, err) + }) +} + +func TestConstructConnectionStringFromTopic(t *testing.T) { + t.Run("valid connectionString without hub name", func(t *testing.T) { + connectionString := "Endpoint=sb://fake.servicebus.windows.net/;SharedAccessKeyName=fakeKey;SharedAccessKey=key" + topic := "testHub" + + aeh := &AzureEventHubs{logger: testLogger, metadata: &azureEventHubsMetadata{ConnectionString: connectionString}} + + c, err := aeh.constructConnectionStringFromTopic(topic) + assert.NoError(t, err) + assert.Equal(t, connectionString+";EntityPath=testHub", c) + }) + t.Run("valid connectionString with hub name", func(t *testing.T) { + connectionString := "Endpoint=sb://fake.servicebus.windows.net/;SharedAccessKeyName=fakeKey;SharedAccessKey=key;EntityPath=testHub" + topic := "testHub" + + aeh := &AzureEventHubs{logger: testLogger, metadata: &azureEventHubsMetadata{ConnectionString: connectionString}} + + c, err := aeh.constructConnectionStringFromTopic(topic) + assert.NoError(t, err) + assert.Equal(t, connectionString, c) + }) + t.Run("invalid connectionString with hub name", func(t *testing.T) { + connectionString := "Endpoint=sb://fake.servicebus.windows.net/;SharedAccessKeyName=fakeKey;ShareKey=key;EntityPath=testHub" + topic := "testHub" + + aeh := &AzureEventHubs{logger: testLogger, metadata: &azureEventHubsMetadata{ConnectionString: connectionString}} + + c, err := aeh.constructConnectionStringFromTopic(topic) + assert.Error(t, err) + assert.Equal(t, "", c) + }) + t.Run("valid connectionString with different hub name", func(t *testing.T) { + connectionString := "Endpoint=sb://fake.servicebus.windows.net/;SharedAccessKeyName=fakeKey;SharedAccessKey=key;EntityPath=testHub" + topic := "differentHub" + + aeh := &AzureEventHubs{logger: testLogger, metadata: &azureEventHubsMetadata{ConnectionString: connectionString}} + + c, err := aeh.constructConnectionStringFromTopic(topic) + assert.Error(t, err) + assert.Equal(t, (fmt.Sprintf(differentTopicConnectionStringErrorTmpl, topic)), err.Error()) + assert.Equal(t, "", c) + }) +} diff --git a/tests/config/pubsub/tests.yml b/tests/config/pubsub/tests.yml index 1550a024f5..0795f873b3 100644 --- a/tests/config/pubsub/tests.yml +++ b/tests/config/pubsub/tests.yml @@ -2,8 +2,8 @@ # Config map: ## pubsubName : name of the pubsub ## testTopicName: name of the test topic to use -## publish: A map of strings that will be part of the publish metadata in the Publish call -## subscribe: A map of strings that will be part of the subscribe metadata in the Subscribe call +## publishMetadata: A map of strings that will be part of the publish metadata in the Publish call +## subscribeMetadata: A map of strings that will be part of the subscribe metadata in the Subscribe call ## maxReadDuration: duration to wait for read to complete ## messageCount: no. of messages to publish ## checkInOrderProcessing: false disables in-order message processing checking @@ -12,7 +12,11 @@ components: - component: azure.eventhubs allOperations: true config: - checkInOrderProcessing: false + ## with partition key set, inorder processing is guaranteed. + ## https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-features#mapping-of-events-to-partitions + checkInOrderProcessing: true + publishMetadata: + partitionKey: abcd - component: azure.servicebus allOperations: true config: