Skip to content

Commit

Permalink
Connection options fix (Azure#18508)
Browse files Browse the repository at this point in the history
* Pass options to consumer client

* fix
  • Loading branch information
JoshLove-msft authored and jongio committed Feb 9, 2021
1 parent 4d27fd1 commit 3d1ab95
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,15 @@ internal IEventHubConsumerClient GetEventHubConsumerClient(string eventHubName,
EventHubConsumerClient client = null;
if (_options.RegisteredConsumerCredentials.TryGetValue(eventHubName, out var creds))
{
client = new EventHubConsumerClient(consumerGroup, creds.EventHubConnectionString, eventHubName);
client = new EventHubConsumerClient(
consumerGroup,
creds.EventHubConnectionString,
eventHubName,
new EventHubConsumerClientOptions
{
RetryOptions = _options.RetryOptions,
ConnectionOptions = _options.ConnectionOptions
});
}
else if (!string.IsNullOrEmpty(connection))
{
Expand All @@ -138,11 +146,27 @@ internal IEventHubConsumerClient GetEventHubConsumerClient(string eventHubName,
if (info.FullyQualifiedEndpoint != null &&
info.TokenCredential != null)
{
client = new EventHubConsumerClient(consumerGroup, info.FullyQualifiedEndpoint, eventHubName, info.TokenCredential);
client = new EventHubConsumerClient(
consumerGroup,
info.FullyQualifiedEndpoint,
eventHubName,
info.TokenCredential,
new EventHubConsumerClientOptions
{
RetryOptions = _options.RetryOptions,
ConnectionOptions = _options.ConnectionOptions
});
}
else
{
client = new EventHubConsumerClient(consumerGroup, NormalizeConnectionString(info.ConnectionString, eventHubName));
client = new EventHubConsumerClient(
consumerGroup,
NormalizeConnectionString(info.ConnectionString, eventHubName),
new EventHubConsumerClientOptions
{
RetryOptions = _options.RetryOptions,
ConnectionOptions = _options.ConnectionOptions
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@

using System;
using System.Collections.Generic;
using System.Reflection;
using Azure.Identity;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Primitives;
using Azure.Messaging.EventHubs.Producer;
using Microsoft.Azure.WebJobs.EventHubs.Processor;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Options;
Expand All @@ -25,12 +31,12 @@ public void EntityPathInConnectionString(string expectedPathName, string connect
EventHubOptions options = new EventHubOptions();

// Test sender
options.AddSender("k1", connectionString);
options.AddSender(expectedPathName, connectionString);

var configuration = CreateConfiguration();
var factory = new EventHubClientFactory(configuration, Mock.Of<AzureComponentFactory>(), Options.Create(options), new DefaultNameResolver(configuration));

var client = factory.GetEventHubProducerClient("k1", null);
var client = factory.GetEventHubProducerClient(expectedPathName, null);
Assert.AreEqual(expectedPathName, client.EventHubName);
}

Expand All @@ -44,7 +50,7 @@ public void GetEventHubClient_AddsConnection(string expectedPathName, string con

var factory = new EventHubClientFactory(configuration, Mock.Of<AzureComponentFactory>(), Options.Create(options), new DefaultNameResolver(configuration));

var client = factory.GetEventHubProducerClient("k1", "connection");
var client = factory.GetEventHubProducerClient(expectedPathName, "connection");
Assert.AreEqual(expectedPathName, client.EventHubName);
}

Expand Down Expand Up @@ -145,6 +151,120 @@ public void UsesRegisteredConnectionToStorageAccount()
Assert.AreEqual("http://blobs/azure-webjobs-eventhub", client.Uri.ToString());
}

[TestCase("k1", ConnectionString)]
[TestCase("path2", ConnectionStringWithEventHub)]
public void RespectsConnectionOptionsForProducer(string expectedPathName, string connectionString)
{
var testEndpoint = new Uri("http://mycustomendpoint.com");
EventHubOptions options = new EventHubOptions
{
ConnectionOptions = new EventHubConnectionOptions
{
CustomEndpointAddress = testEndpoint
},
RetryOptions = new EventHubsRetryOptions
{
MaximumRetries = 10
}
};

options.AddSender(expectedPathName, connectionString);

var configuration = CreateConfiguration();
var factory = new EventHubClientFactory(configuration, Mock.Of<AzureComponentFactory>(), Options.Create(options), new DefaultNameResolver(configuration));

var producer = factory.GetEventHubProducerClient(expectedPathName, null);
EventHubConnection connection = (EventHubConnection)typeof(EventHubProducerClient).GetProperty("Connection", BindingFlags.NonPublic | BindingFlags.Instance)
.GetValue(producer);
EventHubConnectionOptions connectionOptions = (EventHubConnectionOptions)typeof(EventHubConnection).GetProperty("Options", BindingFlags.NonPublic | BindingFlags.Instance).GetValue(connection);

Assert.AreEqual(testEndpoint, connectionOptions.CustomEndpointAddress);
Assert.AreEqual(expectedPathName, producer.EventHubName);

EventHubProducerClientOptions producerOptions = (EventHubProducerClientOptions)typeof(EventHubProducerClient).GetProperty("Options", BindingFlags.NonPublic | BindingFlags.Instance).GetValue(producer);
Assert.AreEqual(10, producerOptions.RetryOptions.MaximumRetries);
Assert.AreEqual(expectedPathName, producer.EventHubName);
}

[TestCase("k1", ConnectionString)]
[TestCase("path2", ConnectionStringWithEventHub)]
public void RespectsConnectionOptionsForConsumer(string expectedPathName, string connectionString)
{
var testEndpoint = new Uri("http://mycustomendpoint.com");
EventHubOptions options = new EventHubOptions
{
ConnectionOptions = new EventHubConnectionOptions
{
CustomEndpointAddress = testEndpoint
},
RetryOptions = new EventHubsRetryOptions
{
MaximumRetries = 10
}
};

options.AddReceiver(expectedPathName, connectionString);

var configuration = CreateConfiguration();
var factory = new EventHubClientFactory(configuration, Mock.Of<AzureComponentFactory>(), Options.Create(options), new DefaultNameResolver(configuration));

var consumer = factory.GetEventHubConsumerClient(expectedPathName, null, "consumer");
var consumerClient = (EventHubConsumerClient)typeof(EventHubConsumerClientImpl)
.GetField("_client", BindingFlags.NonPublic | BindingFlags.Instance)
.GetValue(consumer);
EventHubConnection connection = (EventHubConnection)typeof(EventHubConsumerClient)
.GetProperty("Connection", BindingFlags.NonPublic | BindingFlags.Instance)
.GetValue(consumerClient);
EventHubConnectionOptions connectionOptions = (EventHubConnectionOptions)typeof(EventHubConnection)
.GetProperty("Options", BindingFlags.NonPublic | BindingFlags.Instance)
.GetValue(connection);
Assert.AreEqual(testEndpoint, connectionOptions.CustomEndpointAddress);

EventHubsRetryPolicy retryPolicy = (EventHubsRetryPolicy)typeof(EventHubConsumerClient)
.GetProperty("RetryPolicy", BindingFlags.NonPublic | BindingFlags.Instance)
.GetValue(consumerClient);

// Reflection was still necessary here because BasicRetryOptions (which is the concrete derived type)
// is internal.
EventHubsRetryOptions retryOptions = (EventHubsRetryOptions)retryPolicy.GetType()
.GetProperty("Options", BindingFlags.Public | BindingFlags.Instance)
.GetValue(retryPolicy);
Assert.AreEqual(10, retryOptions.MaximumRetries);
Assert.AreEqual(expectedPathName, consumer.EventHubName);
}

[TestCase("k1", ConnectionString)]
[TestCase("path2", ConnectionStringWithEventHub)]
public void RespectsConnectionOptionsForProcessor(string expectedPathName, string connectionString)
{
var testEndpoint = new Uri("http://mycustomendpoint.com");
EventHubOptions options = new EventHubOptions
{
ConnectionOptions = new EventHubConnectionOptions
{
CustomEndpointAddress = testEndpoint
},
RetryOptions = new EventHubsRetryOptions
{
MaximumRetries = 10
}
};

options.AddReceiver(expectedPathName, connectionString);

var configuration = CreateConfiguration();
var factory = new EventHubClientFactory(configuration, Mock.Of<AzureComponentFactory>(), Options.Create(options), new DefaultNameResolver(configuration));

var processor = factory.GetEventProcessorHost(expectedPathName, null, "consumer");
EventProcessorOptions processorOptions = (EventProcessorOptions)typeof(EventProcessor<EventProcessorHostPartition>)
.GetProperty("Options", BindingFlags.NonPublic | BindingFlags.Instance)
.GetValue(processor);
Assert.AreEqual(testEndpoint, processorOptions.ConnectionOptions.CustomEndpointAddress);

Assert.AreEqual(10, processorOptions.RetryOptions.MaximumRetries);
Assert.AreEqual(expectedPathName, processor.EventHubName);
}

private IConfiguration CreateConfiguration(params KeyValuePair<string, string>[] data)
{
return new ConfigurationBuilder().AddInMemoryCollection(data).Build();
Expand Down

0 comments on commit 3d1ab95

Please sign in to comment.