Skip to content

Commit

Permalink
API updates to support function level autocomplete (#21181)
Browse files Browse the repository at this point in the history
* API updates to support function level autocomplete.

* Fix test

* Fix docs
  • Loading branch information
JoshLove-msft authored May 20, 2021
1 parent 5696f9c commit c3ceaaf
Show file tree
Hide file tree
Showing 14 changed files with 241 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,20 @@ public partial class MessageProcessor
{
public MessageProcessor(Azure.Messaging.ServiceBus.ServiceBusProcessor processor) { }
protected internal Azure.Messaging.ServiceBus.ServiceBusProcessor Processor { get { throw null; } }
public virtual System.Threading.Tasks.Task<bool> BeginProcessingMessageAsync(Microsoft.Azure.WebJobs.ServiceBus.ServiceBusMessageActions messageActions, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Threading.CancellationToken cancellationToken) { throw null; }
public virtual System.Threading.Tasks.Task CompleteProcessingMessageAsync(Microsoft.Azure.WebJobs.ServiceBus.ServiceBusMessageActions messageActions, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, Microsoft.Azure.WebJobs.Host.Executors.FunctionResult result, System.Threading.CancellationToken cancellationToken) { throw null; }
protected internal virtual System.Threading.Tasks.Task<bool> BeginProcessingMessageAsync(Microsoft.Azure.WebJobs.ServiceBus.ServiceBusMessageActions messageActions, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Threading.CancellationToken cancellationToken) { throw null; }
protected internal virtual System.Threading.Tasks.Task CompleteProcessingMessageAsync(Microsoft.Azure.WebJobs.ServiceBus.ServiceBusMessageActions messageActions, Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, Microsoft.Azure.WebJobs.Host.Executors.FunctionResult result, System.Threading.CancellationToken cancellationToken) { throw null; }
}
public partial class MessagingProvider
{
protected MessagingProvider() { }
public MessagingProvider(Microsoft.Extensions.Options.IOptions<Microsoft.Azure.WebJobs.ServiceBus.ServiceBusOptions> options) { }
public virtual Azure.Messaging.ServiceBus.ServiceBusReceiver CreateBatchMessageReceiver(Azure.Messaging.ServiceBus.ServiceBusClient client, string entityPath) { throw null; }
public virtual Azure.Messaging.ServiceBus.ServiceBusClient CreateClient(string connectionString) { throw null; }
public virtual Azure.Messaging.ServiceBus.ServiceBusClient CreateClient(string fullyQualifiedNamespace, Azure.Core.TokenCredential credential) { throw null; }
public virtual Microsoft.Azure.WebJobs.ServiceBus.MessageProcessor CreateMessageProcessor(Azure.Messaging.ServiceBus.ServiceBusClient client, string entityPath) { throw null; }
public virtual Azure.Messaging.ServiceBus.ServiceBusSender CreateMessageSender(Azure.Messaging.ServiceBus.ServiceBusClient client, string entityPath) { throw null; }
public virtual Azure.Messaging.ServiceBus.ServiceBusProcessor CreateProcessor(Azure.Messaging.ServiceBus.ServiceBusClient client, string entityPath) { throw null; }
public virtual Microsoft.Azure.WebJobs.ServiceBus.SessionMessageProcessor CreateSessionMessageProcessor(Azure.Messaging.ServiceBus.ServiceBusClient client, string entityPath) { throw null; }
public virtual Azure.Messaging.ServiceBus.ServiceBusSessionProcessor CreateSessionProcessor(Azure.Messaging.ServiceBus.ServiceBusClient client, string entityPath) { throw null; }
protected internal virtual Azure.Messaging.ServiceBus.ServiceBusReceiver CreateBatchMessageReceiver(Azure.Messaging.ServiceBus.ServiceBusClient client, string entityPath, Azure.Messaging.ServiceBus.ServiceBusReceiverOptions options) { throw null; }
protected internal virtual Azure.Messaging.ServiceBus.ServiceBusClient CreateClient(string fullyQualifiedNamespace, Azure.Core.TokenCredential credential, Azure.Messaging.ServiceBus.ServiceBusClientOptions options) { throw null; }
protected internal virtual Azure.Messaging.ServiceBus.ServiceBusClient CreateClient(string connectionString, Azure.Messaging.ServiceBus.ServiceBusClientOptions options) { throw null; }
protected internal virtual Microsoft.Azure.WebJobs.ServiceBus.MessageProcessor CreateMessageProcessor(Azure.Messaging.ServiceBus.ServiceBusClient client, string entityPath, Azure.Messaging.ServiceBus.ServiceBusProcessorOptions options) { throw null; }
protected internal virtual Azure.Messaging.ServiceBus.ServiceBusSender CreateMessageSender(Azure.Messaging.ServiceBus.ServiceBusClient client, string entityPath) { throw null; }
protected internal virtual Azure.Messaging.ServiceBus.ServiceBusProcessor CreateProcessor(Azure.Messaging.ServiceBus.ServiceBusClient client, string entityPath, Azure.Messaging.ServiceBus.ServiceBusProcessorOptions options) { throw null; }
protected internal virtual Microsoft.Azure.WebJobs.ServiceBus.SessionMessageProcessor CreateSessionMessageProcessor(Azure.Messaging.ServiceBus.ServiceBusClient client, string entityPath, Azure.Messaging.ServiceBus.ServiceBusSessionProcessorOptions options) { throw null; }
protected internal virtual Azure.Messaging.ServiceBus.ServiceBusSessionProcessor CreateSessionProcessor(Azure.Messaging.ServiceBus.ServiceBusClient client, string entityPath, Azure.Messaging.ServiceBus.ServiceBusSessionProcessorOptions options) { throw null; }
}
public enum ServiceBusEntityType
{
Expand All @@ -77,9 +76,9 @@ public ServiceBusOptions() { }
public System.Func<Azure.Messaging.ServiceBus.ProcessErrorEventArgs, System.Threading.Tasks.Task> ExceptionHandler { get { throw null; } set { } }
public Newtonsoft.Json.JsonSerializerSettings JsonSerializerSettings { get { throw null; } set { } }
public System.TimeSpan MaxAutoLockRenewalDuration { get { throw null; } set { } }
public int MaxBatchSize { get { throw null; } set { } }
public int MaxConcurrentCalls { get { throw null; } set { } }
public int MaxConcurrentSessions { get { throw null; } set { } }
public int MaxMessages { get { throw null; } set { } }
public int PrefetchCount { get { throw null; } set { } }
public System.TimeSpan? SessionIdleTimeout { get { throw null; } set { } }
public Azure.Messaging.ServiceBus.ServiceBusTransportType TransportType { get { throw null; } set { } }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.Configuration;
using System;
using Microsoft.Extensions.Options;
using Constants = Microsoft.Azure.WebJobs.ServiceBus.Constants;

namespace Microsoft.Azure.WebJobs.Extensions.ServiceBus.Config
Expand All @@ -18,25 +19,28 @@ internal class ServiceBusClientFactory
private readonly IConfiguration _configuration;
private readonly AzureComponentFactory _componentFactory;
private readonly MessagingProvider _messagingProvider;
private readonly ServiceBusOptions _options;

public ServiceBusClientFactory(
IConfiguration configuration,
AzureComponentFactory componentFactory,
MessagingProvider messagingProvider,
AzureEventSourceLogForwarder logForwarder)
AzureEventSourceLogForwarder logForwarder,
IOptions<ServiceBusOptions> options)
{
_configuration = configuration;
_componentFactory = componentFactory;
_messagingProvider = messagingProvider;
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
logForwarder.Start();
}

internal ServiceBusClient CreateClientFromSetting(string connection)
{
var connectionInfo = ResolveConnectionInformation(connection);

return connectionInfo.ConnectionString != null ? _messagingProvider.CreateClient(connectionInfo.ConnectionString)
: _messagingProvider.CreateClient(connectionInfo.FullyQualifiedNamespace, connectionInfo.Credential);
return connectionInfo.ConnectionString != null ? _messagingProvider.CreateClient(connectionInfo.ConnectionString, _options.ToClientOptions())
: _messagingProvider.CreateClient(connectionInfo.FullyQualifiedNamespace, connectionInfo.Credential, _options.ToClientOptions());
}

internal ServiceBusAdministrationClient CreateAdministrationClient(string connection)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public int MaxConcurrentSessions
/// Gets or sets the maximum number of messages that will be passed to each function call. This only applies for functions that receive
/// a batch of messages. The default value is 1000.
/// </summary>
public int MaxMessages { get; set; } = 1000;
public int MaxBatchSize { get; set; } = 1000;

/// <summary>
/// Gets or sets the maximum amount of time to wait for a message to be received for the
Expand Down Expand Up @@ -179,7 +179,7 @@ string IOptionsFormatter.Format()
{ nameof(MaxAutoLockRenewalDuration), MaxAutoLockRenewalDuration },
{ nameof(MaxConcurrentCalls), MaxConcurrentCalls },
{ nameof(MaxConcurrentSessions), MaxConcurrentSessions },
{ nameof(MaxMessages), MaxMessages },
{ nameof(MaxBatchSize), MaxBatchSize },
{ nameof(SessionIdleTimeout), SessionIdleTimeout.ToString() ?? string.Empty }
};

Expand Down Expand Up @@ -212,6 +212,12 @@ internal ServiceBusSessionProcessorOptions ToSessionProcessorOptions() =>
SessionIdleTimeout = SessionIdleTimeout
};

internal ServiceBusReceiverOptions ToReceiverOptions() =>
new ServiceBusReceiverOptions
{
PrefetchCount = PrefetchCount
};

internal ServiceBusClientOptions ToClientOptions() =>
new ServiceBusClientOptions
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ internal sealed class ServiceBusListener : IListener, IScaleMonitorProvider
private readonly MessagingProvider _messagingProvider;
private readonly ITriggeredFunctionExecutor _triggerExecutor;
private readonly string _functionId;
private readonly ServiceBusEntityType _serviceBusEntityType;
private readonly ServiceBusEntityType _entityType;
private readonly string _entityPath;
private readonly bool _isSessionsEnabled;
private readonly CancellationTokenSource _cancellationTokenSource;
Expand All @@ -49,7 +49,7 @@ internal sealed class ServiceBusListener : IListener, IScaleMonitorProvider

public ServiceBusListener(
string functionId,
ServiceBusEntityType serviceBusEntityType,
ServiceBusEntityType entityType,
string entityPath,
bool isSessionsEnabled,
ITriggeredFunctionExecutor triggerExecutor,
Expand All @@ -61,7 +61,7 @@ public ServiceBusListener(
ServiceBusClientFactory clientFactory)
{
_functionId = functionId;
_serviceBusEntityType = serviceBusEntityType;
_entityType = entityType;
_entityPath = entityPath;
_isSessionsEnabled = isSessionsEnabled;
_triggerExecutor = triggerExecutor;
Expand All @@ -70,12 +70,38 @@ public ServiceBusListener(
_loggerFactory = loggerFactory;
_logger = loggerFactory.CreateLogger<ServiceBusListener>();

_client = new Lazy<ServiceBusClient>(() => clientFactory.CreateClientFromSetting(connection));
_batchReceiver = new Lazy<ServiceBusReceiver>(() => _messagingProvider.CreateBatchMessageReceiver(_client.Value, _entityPath));
_messageProcessor = new Lazy<MessageProcessor>(() => _messagingProvider.CreateMessageProcessor(_client.Value, _entityPath));
_sessionMessageProcessor = new Lazy<SessionMessageProcessor>(() => _messagingProvider.CreateSessionMessageProcessor(_client.Value, _entityPath));
_client = new Lazy<ServiceBusClient>(
() =>
clientFactory.CreateClientFromSetting(connection));

_batchReceiver = new Lazy<ServiceBusReceiver>(
() => _messagingProvider.CreateBatchMessageReceiver(
_client.Value,
_entityPath,
options.ToReceiverOptions()));

_messageProcessor = new Lazy<MessageProcessor>(
() => _messagingProvider.CreateMessageProcessor(
_client.Value,
_entityPath,
options.ToProcessorOptions()));

_sessionMessageProcessor = new Lazy<SessionMessageProcessor>(
() => _messagingProvider.CreateSessionMessageProcessor(
_client.Value,
_entityPath,
options.ToSessionProcessorOptions()));

_scaleMonitor = new Lazy<ServiceBusScaleMonitor>(
() => new ServiceBusScaleMonitor(
_functionId,
_entityType,
_entityPath,
connection,
_batchReceiver,
_loggerFactory,
clientFactory));

_scaleMonitor = new Lazy<ServiceBusScaleMonitor>(() => new ServiceBusScaleMonitor(_functionId, _serviceBusEntityType, _entityPath, connection, _batchReceiver, _loggerFactory, clientFactory));
_singleDispatch = singleDispatch;
_serviceBusOptions = options;
}
Expand Down Expand Up @@ -266,7 +292,7 @@ internal void StartMessageBatchReceiver(CancellationToken cancellationToken)
}
IReadOnlyList<ServiceBusReceivedMessage> messages =
await receiver.ReceiveMessagesAsync(_serviceBusOptions.MaxMessages).ConfigureAwait(false);
await receiver.ReceiveMessagesAsync(_serviceBusOptions.MaxBatchSize).ConfigureAwait(false);
if (messages != null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public MessageProcessor(ServiceBusProcessor processor)
/// <param name="message">The <see cref="ServiceBusReceivedMessage"/> to process.</param>
/// <param name="cancellationToken">A cancellation token that will be cancelled when the processor is shutting down.</param>
/// <returns>A <see cref="Task"/> that returns true if the message processing should continue, false otherwise.</returns>
public virtual Task<bool> BeginProcessingMessageAsync(ServiceBusMessageActions messageActions, ServiceBusReceivedMessage message, CancellationToken cancellationToken)
protected internal virtual Task<bool> BeginProcessingMessageAsync(ServiceBusMessageActions messageActions, ServiceBusReceivedMessage message, CancellationToken cancellationToken)
{
return Task.FromResult<bool>(true);
}
Expand All @@ -54,7 +54,7 @@ public virtual Task<bool> BeginProcessingMessageAsync(ServiceBusMessageActions m
/// <param name="result">The <see cref="FunctionResult"/> from the job invocation.</param>
/// <param name="cancellationToken">A cancellation token that will be cancelled when the processor is shutting down.</param>
/// <returns>A <see cref="Task"/> that will complete the message processing.</returns>
public virtual Task CompleteProcessingMessageAsync(ServiceBusMessageActions messageActions, ServiceBusReceivedMessage message, FunctionResult result, CancellationToken cancellationToken)
protected internal virtual Task CompleteProcessingMessageAsync(ServiceBusMessageActions messageActions, ServiceBusReceivedMessage message, FunctionResult result, CancellationToken cancellationToken)
{
if (message is null)
{
Expand Down
Loading

0 comments on commit c3ceaaf

Please sign in to comment.