Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connection options fix #18508

Merged
merged 2 commits into from
Feb 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,15 @@ internal IEventHubConsumerClient GetEventHubConsumerClient(string eventHubName,
EventHubConsumerClient client = null;
if (_options.RegisteredConsumerCredentials.TryGetValue(eventHubName, out var creds))
{
client = new EventHubConsumerClient(consumerGroup, creds.EventHubConnectionString, eventHubName);
client = new EventHubConsumerClient(
consumerGroup,
creds.EventHubConnectionString,
eventHubName,
new EventHubConsumerClientOptions
{
RetryOptions = _options.RetryOptions,
ConnectionOptions = _options.ConnectionOptions
});
}
else if (!string.IsNullOrEmpty(connection))
{
Expand All @@ -138,11 +146,27 @@ internal IEventHubConsumerClient GetEventHubConsumerClient(string eventHubName,
if (info.FullyQualifiedEndpoint != null &&
info.TokenCredential != null)
{
client = new EventHubConsumerClient(consumerGroup, info.FullyQualifiedEndpoint, eventHubName, info.TokenCredential);
client = new EventHubConsumerClient(
consumerGroup,
info.FullyQualifiedEndpoint,
eventHubName,
info.TokenCredential,
new EventHubConsumerClientOptions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Helper method?

{
RetryOptions = _options.RetryOptions,
ConnectionOptions = _options.ConnectionOptions
});
}
else
{
client = new EventHubConsumerClient(consumerGroup, NormalizeConnectionString(info.ConnectionString, eventHubName));
client = new EventHubConsumerClient(
consumerGroup,
NormalizeConnectionString(info.ConnectionString, eventHubName),
new EventHubConsumerClientOptions
{
RetryOptions = _options.RetryOptions,
ConnectionOptions = _options.ConnectionOptions
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@

using System;
using System.Collections.Generic;
using System.Reflection;
using Azure.Identity;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Primitives;
using Azure.Messaging.EventHubs.Producer;
using Microsoft.Azure.WebJobs.EventHubs.Processor;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Options;
Expand All @@ -25,12 +31,12 @@ public void EntityPathInConnectionString(string expectedPathName, string connect
EventHubOptions options = new EventHubOptions();

// Test sender
options.AddSender("k1", connectionString);
options.AddSender(expectedPathName, connectionString);

var configuration = CreateConfiguration();
var factory = new EventHubClientFactory(configuration, Mock.Of<AzureComponentFactory>(), Options.Create(options), new DefaultNameResolver(configuration));

var client = factory.GetEventHubProducerClient("k1", null);
var client = factory.GetEventHubProducerClient(expectedPathName, null);
Assert.AreEqual(expectedPathName, client.EventHubName);
}

Expand All @@ -44,7 +50,7 @@ public void GetEventHubClient_AddsConnection(string expectedPathName, string con

var factory = new EventHubClientFactory(configuration, Mock.Of<AzureComponentFactory>(), Options.Create(options), new DefaultNameResolver(configuration));

var client = factory.GetEventHubProducerClient("k1", "connection");
var client = factory.GetEventHubProducerClient(expectedPathName, "connection");
Assert.AreEqual(expectedPathName, client.EventHubName);
}

Expand Down Expand Up @@ -145,6 +151,120 @@ public void UsesRegisteredConnectionToStorageAccount()
Assert.AreEqual("http://blobs/azure-webjobs-eventhub", client.Uri.ToString());
}

[TestCase("k1", ConnectionString)]
[TestCase("path2", ConnectionStringWithEventHub)]
public void RespectsConnectionOptionsForProducer(string expectedPathName, string connectionString)
{
var testEndpoint = new Uri("http://mycustomendpoint.com");
EventHubOptions options = new EventHubOptions
{
ConnectionOptions = new EventHubConnectionOptions
{
CustomEndpointAddress = testEndpoint
},
RetryOptions = new EventHubsRetryOptions
{
MaximumRetries = 10
}
};

options.AddSender(expectedPathName, connectionString);

var configuration = CreateConfiguration();
var factory = new EventHubClientFactory(configuration, Mock.Of<AzureComponentFactory>(), Options.Create(options), new DefaultNameResolver(configuration));

var producer = factory.GetEventHubProducerClient(expectedPathName, null);
EventHubConnection connection = (EventHubConnection)typeof(EventHubProducerClient).GetProperty("Connection", BindingFlags.NonPublic | BindingFlags.Instance)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://media1.tenor.com/images/861ec956eecd81b0e0588265c7779944/tenor.gif?itemid=4897344

.GetValue(producer);
EventHubConnectionOptions connectionOptions = (EventHubConnectionOptions)typeof(EventHubConnection).GetProperty("Options", BindingFlags.NonPublic | BindingFlags.Instance).GetValue(connection);

Assert.AreEqual(testEndpoint, connectionOptions.CustomEndpointAddress);
Assert.AreEqual(expectedPathName, producer.EventHubName);

EventHubProducerClientOptions producerOptions = (EventHubProducerClientOptions)typeof(EventHubProducerClient).GetProperty("Options", BindingFlags.NonPublic | BindingFlags.Instance).GetValue(producer);
Assert.AreEqual(10, producerOptions.RetryOptions.MaximumRetries);
Assert.AreEqual(expectedPathName, producer.EventHubName);
}

[TestCase("k1", ConnectionString)]
[TestCase("path2", ConnectionStringWithEventHub)]
public void RespectsConnectionOptionsForConsumer(string expectedPathName, string connectionString)
{
var testEndpoint = new Uri("http://mycustomendpoint.com");
EventHubOptions options = new EventHubOptions
{
ConnectionOptions = new EventHubConnectionOptions
{
CustomEndpointAddress = testEndpoint
},
RetryOptions = new EventHubsRetryOptions
{
MaximumRetries = 10
}
};

options.AddReceiver(expectedPathName, connectionString);

var configuration = CreateConfiguration();
var factory = new EventHubClientFactory(configuration, Mock.Of<AzureComponentFactory>(), Options.Create(options), new DefaultNameResolver(configuration));

var consumer = factory.GetEventHubConsumerClient(expectedPathName, null, "consumer");
var consumerClient = (EventHubConsumerClient)typeof(EventHubConsumerClientImpl)
.GetField("_client", BindingFlags.NonPublic | BindingFlags.Instance)
.GetValue(consumer);
EventHubConnection connection = (EventHubConnection)typeof(EventHubConsumerClient)
.GetProperty("Connection", BindingFlags.NonPublic | BindingFlags.Instance)
.GetValue(consumerClient);
EventHubConnectionOptions connectionOptions = (EventHubConnectionOptions)typeof(EventHubConnection)
.GetProperty("Options", BindingFlags.NonPublic | BindingFlags.Instance)
.GetValue(connection);
Assert.AreEqual(testEndpoint, connectionOptions.CustomEndpointAddress);

EventHubsRetryPolicy retryPolicy = (EventHubsRetryPolicy)typeof(EventHubConsumerClient)
.GetProperty("RetryPolicy", BindingFlags.NonPublic | BindingFlags.Instance)
.GetValue(consumerClient);

// Reflection was still necessary here because BasicRetryOptions (which is the concrete derived type)
// is internal.
EventHubsRetryOptions retryOptions = (EventHubsRetryOptions)retryPolicy.GetType()
.GetProperty("Options", BindingFlags.Public | BindingFlags.Instance)
.GetValue(retryPolicy);
Assert.AreEqual(10, retryOptions.MaximumRetries);
Assert.AreEqual(expectedPathName, consumer.EventHubName);
}

[TestCase("k1", ConnectionString)]
[TestCase("path2", ConnectionStringWithEventHub)]
public void RespectsConnectionOptionsForProcessor(string expectedPathName, string connectionString)
{
var testEndpoint = new Uri("http://mycustomendpoint.com");
EventHubOptions options = new EventHubOptions
{
ConnectionOptions = new EventHubConnectionOptions
{
CustomEndpointAddress = testEndpoint
},
RetryOptions = new EventHubsRetryOptions
{
MaximumRetries = 10
}
};

options.AddReceiver(expectedPathName, connectionString);

var configuration = CreateConfiguration();
var factory = new EventHubClientFactory(configuration, Mock.Of<AzureComponentFactory>(), Options.Create(options), new DefaultNameResolver(configuration));

var processor = factory.GetEventProcessorHost(expectedPathName, null, "consumer");
EventProcessorOptions processorOptions = (EventProcessorOptions)typeof(EventProcessor<EventProcessorHostPartition>)
.GetProperty("Options", BindingFlags.NonPublic | BindingFlags.Instance)
.GetValue(processor);
Assert.AreEqual(testEndpoint, processorOptions.ConnectionOptions.CustomEndpointAddress);

Assert.AreEqual(10, processorOptions.RetryOptions.MaximumRetries);
Assert.AreEqual(expectedPathName, processor.EventHubName);
}

private IConfiguration CreateConfiguration(params KeyValuePair<string, string>[] data)
{
return new ConfigurationBuilder().AddInMemoryCollection(data).Build();
Expand Down