-
Notifications
You must be signed in to change notification settings - Fork 4.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
API updates to support function level autocomplete #21181
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,7 @@ | |
using Microsoft.Extensions.Azure; | ||
using Microsoft.Extensions.Configuration; | ||
using System; | ||
using Microsoft.Extensions.Options; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (extreme) nit: We should consider sorting the usings. |
||
using Constants = Microsoft.Azure.WebJobs.ServiceBus.Constants; | ||
|
||
namespace Microsoft.Azure.WebJobs.Extensions.ServiceBus.Config | ||
|
@@ -18,25 +19,28 @@ internal class ServiceBusClientFactory | |
private readonly IConfiguration _configuration; | ||
private readonly AzureComponentFactory _componentFactory; | ||
private readonly MessagingProvider _messagingProvider; | ||
private readonly ServiceBusOptions _options; | ||
|
||
public ServiceBusClientFactory( | ||
IConfiguration configuration, | ||
AzureComponentFactory componentFactory, | ||
MessagingProvider messagingProvider, | ||
AzureEventSourceLogForwarder logForwarder) | ||
AzureEventSourceLogForwarder logForwarder, | ||
IOptions<ServiceBusOptions> options) | ||
{ | ||
_configuration = configuration; | ||
_componentFactory = componentFactory; | ||
_messagingProvider = messagingProvider; | ||
_options = options?.Value ?? throw new ArgumentNullException(nameof(options)); | ||
logForwarder.Start(); | ||
} | ||
|
||
internal ServiceBusClient CreateClientFromSetting(string connection) | ||
{ | ||
var connectionInfo = ResolveConnectionInformation(connection); | ||
|
||
return connectionInfo.ConnectionString != null ? _messagingProvider.CreateClient(connectionInfo.ConnectionString) | ||
: _messagingProvider.CreateClient(connectionInfo.FullyQualifiedNamespace, connectionInfo.Credential); | ||
return connectionInfo.ConnectionString != null ? _messagingProvider.CreateClient(connectionInfo.ConnectionString, _options.ToClientOptions()) | ||
: _messagingProvider.CreateClient(connectionInfo.FullyQualifiedNamespace, connectionInfo.Credential, _options.ToClientOptions()); | ||
} | ||
|
||
internal ServiceBusAdministrationClient CreateAdministrationClient(string connection) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,7 +27,7 @@ internal sealed class ServiceBusListener : IListener, IScaleMonitorProvider | |
private readonly MessagingProvider _messagingProvider; | ||
private readonly ITriggeredFunctionExecutor _triggerExecutor; | ||
private readonly string _functionId; | ||
private readonly ServiceBusEntityType _serviceBusEntityType; | ||
private readonly ServiceBusEntityType _entityType; | ||
private readonly string _entityPath; | ||
private readonly bool _isSessionsEnabled; | ||
private readonly CancellationTokenSource _cancellationTokenSource; | ||
|
@@ -49,7 +49,7 @@ internal sealed class ServiceBusListener : IListener, IScaleMonitorProvider | |
|
||
public ServiceBusListener( | ||
string functionId, | ||
ServiceBusEntityType serviceBusEntityType, | ||
ServiceBusEntityType entityType, | ||
string entityPath, | ||
bool isSessionsEnabled, | ||
ITriggeredFunctionExecutor triggerExecutor, | ||
|
@@ -61,7 +61,7 @@ public ServiceBusListener( | |
ServiceBusClientFactory clientFactory) | ||
{ | ||
_functionId = functionId; | ||
_serviceBusEntityType = serviceBusEntityType; | ||
_entityType = entityType; | ||
_entityPath = entityPath; | ||
_isSessionsEnabled = isSessionsEnabled; | ||
_triggerExecutor = triggerExecutor; | ||
|
@@ -70,12 +70,38 @@ public ServiceBusListener( | |
_loggerFactory = loggerFactory; | ||
_logger = loggerFactory.CreateLogger<ServiceBusListener>(); | ||
|
||
_client = new Lazy<ServiceBusClient>(() => clientFactory.CreateClientFromSetting(connection)); | ||
_batchReceiver = new Lazy<ServiceBusReceiver>(() => _messagingProvider.CreateBatchMessageReceiver(_client.Value, _entityPath)); | ||
_messageProcessor = new Lazy<MessageProcessor>(() => _messagingProvider.CreateMessageProcessor(_client.Value, _entityPath)); | ||
_sessionMessageProcessor = new Lazy<SessionMessageProcessor>(() => _messagingProvider.CreateSessionMessageProcessor(_client.Value, _entityPath)); | ||
_client = new Lazy<ServiceBusClient>( | ||
() => | ||
clientFactory.CreateClientFromSetting(connection)); | ||
|
||
_batchReceiver = new Lazy<ServiceBusReceiver>( | ||
() => _messagingProvider.CreateBatchMessageReceiver( | ||
_client.Value, | ||
_entityPath, | ||
options.ToReceiverOptions())); | ||
|
||
_messageProcessor = new Lazy<MessageProcessor>( | ||
() => _messagingProvider.CreateMessageProcessor( | ||
_client.Value, | ||
_entityPath, | ||
options.ToProcessorOptions())); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This would be updated to take the AutoComplete property from ServiceBusTriggerAttribute into account, once the feature is added. |
||
|
||
_sessionMessageProcessor = new Lazy<SessionMessageProcessor>( | ||
() => _messagingProvider.CreateSessionMessageProcessor( | ||
_client.Value, | ||
_entityPath, | ||
options.ToSessionProcessorOptions())); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This would be updated to take the AutoComplete property from ServiceBusTriggerAttribute into account, once the feature is added. |
||
|
||
_scaleMonitor = new Lazy<ServiceBusScaleMonitor>( | ||
() => new ServiceBusScaleMonitor( | ||
_functionId, | ||
_entityType, | ||
_entityPath, | ||
connection, | ||
_batchReceiver, | ||
_loggerFactory, | ||
clientFactory)); | ||
|
||
_scaleMonitor = new Lazy<ServiceBusScaleMonitor>(() => new ServiceBusScaleMonitor(_functionId, _serviceBusEntityType, _entityPath, connection, _batchReceiver, _loggerFactory, clientFactory)); | ||
_singleDispatch = singleDispatch; | ||
_serviceBusOptions = options; | ||
} | ||
|
@@ -266,7 +292,7 @@ internal void StartMessageBatchReceiver(CancellationToken cancellationToken) | |
} | ||
IReadOnlyList<ServiceBusReceivedMessage> messages = | ||
await receiver.ReceiveMessagesAsync(_serviceBusOptions.MaxMessages).ConfigureAwait(false); | ||
await receiver.ReceiveMessagesAsync(_serviceBusOptions.MaxBatchSize).ConfigureAwait(false); | ||
if (messages != null) | ||
{ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This matches the access modifier of the storage queue processor methods - https://github.com/Azure/azure-sdk-for-net/blob/master/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Common/src/Shared/Queues/QueueProcessor.cs#L66