diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/CHANGELOG.md b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/CHANGELOG.md index bfcdf8c9c88a..29a120c5f694 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/CHANGELOG.md +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/CHANGELOG.md @@ -1,5 +1,15 @@ # Release History +## 5.14.0-beta.1 (Unreleased) + +### Features Added + +### Breaking Changes + +### Bugs Fixed + +### Other Changes + ## 5.13.0 (2023-10-11) ### Features Added diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusHostBuilderExtensions.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusHostBuilderExtensions.cs index a36eb6143ce3..713172392493 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusHostBuilderExtensions.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusHostBuilderExtensions.cs @@ -110,6 +110,7 @@ public static IWebJobsBuilder AddServiceBus(this IWebJobsBuilder builder, Action builder.Services.AddAzureClientsCore(); builder.Services.TryAddSingleton(); + builder.Services.AddHostedService(); builder.Services.AddSingleton(); #if NET6_0_OR_GREATER builder.Services.AddSingleton(); diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListener.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListener.cs index 51515f33a3c8..f191204c7f7b 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListener.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListener.cs @@ -305,12 +305,8 @@ public void Dispose() _stoppingCancellationTokenSource.Cancel(); _functionExecutionCancellationTokenSource.Cancel(); - if (_batchReceiver.IsValueCreated) - { -#pragma warning disable AZC0102 // Do not use GetAwaiter().GetResult(). - _batchReceiver.Value.CloseAsync(CancellationToken.None).GetAwaiter().GetResult(); -#pragma warning restore AZC0102 // Do not use GetAwaiter().GetResult(). - } + // ServiceBusClient and receivers will be disposed in CachedClientCleanupService, so as not to dispose it while it's still in + // use by other listeners. Processors are disposed here as they cannot be shared amongst listeners. if (_messageProcessor.IsValueCreated) { @@ -326,13 +322,6 @@ public void Dispose() #pragma warning restore AZC0102 // Do not use GetAwaiter().GetResult(). } - if (_client.IsValueCreated) - { -#pragma warning disable AZC0102 // Do not use GetAwaiter().GetResult(). - _client.Value.DisposeAsync().AsTask().GetAwaiter().GetResult(); -#pragma warning restore AZC0102 // Do not use GetAwaiter().GetResult(). - } - _stopAsyncSemaphore.Dispose(); _stoppingCancellationTokenSource.Dispose(); _concurrencyUpdateManager?.Dispose(); diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj index 64c5af298d6f..77ac3d72bb2b 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj @@ -3,10 +3,10 @@ netstandard2.0;net6.0 Microsoft Azure WebJobs SDK ServiceBus Extension - 5.13.0 + 5.14.0-beta.1 - 5.12.0 + 5.13.0 $(NoWarn);AZC0001;CS1591;SA1636;AZC0007;AZC0015 true true @@ -48,10 +48,4 @@ - - - - - - diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/CachedClientCleanupService.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/CachedClientCleanupService.cs new file mode 100644 index 000000000000..635d2e390e7b --- /dev/null +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/CachedClientCleanupService.cs @@ -0,0 +1,46 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Hosting; + +namespace Microsoft.Azure.WebJobs.ServiceBus +{ + internal class CachedClientCleanupService : IHostedService + { + private readonly MessagingProvider _provider; + + public CachedClientCleanupService(MessagingProvider provider) + { + _provider = provider; + } + + public Task StartAsync(CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + + public async Task StopAsync(CancellationToken cancellationToken) + { + foreach (var receiver in _provider.MessageReceiverCache.Values) + { + await receiver.DisposeAsync().ConfigureAwait(false); + } + _provider.MessageReceiverCache.Clear(); + + foreach (var sender in _provider.MessageSenderCache.Values) + { + await sender.DisposeAsync().ConfigureAwait(false); + } + _provider.MessageSenderCache.Clear(); + + foreach (var client in _provider.ClientCache.Values) + { + await client.DisposeAsync().ConfigureAwait(false); + } + _provider.ClientCache.Clear(); + _provider.ActionsCache.Clear(); + } + } +} \ No newline at end of file diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/MessagingProvider.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/MessagingProvider.cs index 413be9766885..84af7e458574 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/MessagingProvider.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Primitives/MessagingProvider.cs @@ -19,9 +19,9 @@ public class MessagingProvider { internal ServiceBusOptions Options { get; } - private readonly ConcurrentDictionary _messageSenderCache = new(); - private readonly ConcurrentDictionary _messageReceiverCache = new(); - private readonly ConcurrentDictionary _clientCache = new(); + internal ConcurrentDictionary MessageSenderCache { get; } = new(); + internal ConcurrentDictionary MessageReceiverCache { get; } = new(); + internal ConcurrentDictionary ClientCache { get; } = new(); internal ConcurrentDictionary ActionsCache { get; } = new(); /// @@ -49,7 +49,7 @@ protected internal virtual ServiceBusClient CreateClient(string connectionString Argument.AssertNotNullOrEmpty(connectionString, nameof(connectionString)); Argument.AssertNotNull(options, nameof(options)); - return _clientCache.GetOrAdd( + return ClientCache.GetOrAdd( connectionString, (_) => new ServiceBusClient(connectionString, options)); } @@ -71,7 +71,7 @@ protected internal virtual ServiceBusClient CreateClient(string fullyQualifiedNa Argument.AssertNotNull(credential, nameof(credential)); Argument.AssertNotNull(options, nameof(options)); - return _clientCache.GetOrAdd( + return ClientCache.GetOrAdd( fullyQualifiedNamespace, (_) => new ServiceBusClient(fullyQualifiedNamespace, credential, options)); } @@ -140,7 +140,7 @@ protected internal virtual ServiceBusSender CreateMessageSender(ServiceBusClient Argument.AssertNotNull(client, nameof(client)); Argument.AssertNotNullOrEmpty(entityPath, nameof(entityPath)); - return _messageSenderCache.GetOrAdd(GenerateCacheKey(client.FullyQualifiedNamespace, entityPath), client.CreateSender(entityPath)); + return MessageSenderCache.GetOrAdd(GenerateCacheKey(client.FullyQualifiedNamespace, entityPath), client.CreateSender(entityPath)); } /// @@ -159,7 +159,7 @@ protected internal virtual ServiceBusReceiver CreateBatchMessageReceiver(Service Argument.AssertNotNullOrEmpty(entityPath, nameof(entityPath)); Argument.AssertNotNull(options, nameof(options)); - return _messageReceiverCache.GetOrAdd(GenerateCacheKey(client.FullyQualifiedNamespace, entityPath), (_) => client.CreateReceiver(entityPath, options)); + return MessageReceiverCache.GetOrAdd(GenerateCacheKey(client.FullyQualifiedNamespace, entityPath), (_) => client.CreateReceiver(entityPath, options)); } /// diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/WebJobsServiceBusTestBase.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/WebJobsServiceBusTestBase.cs index 3b04778b1965..5353e6ef5cd1 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/WebJobsServiceBusTestBase.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/WebJobsServiceBusTestBase.cs @@ -14,6 +14,7 @@ using Azure.Messaging.ServiceBus.Administration; using Azure.Messaging.ServiceBus.Tests; using Microsoft.Azure.WebJobs.Host.TestCommon; +using Microsoft.Azure.WebJobs.ServiceBus; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; @@ -335,6 +336,12 @@ public async Task StopAsync(CancellationToken cancellationToken) QueueRuntimeProperties properties = await client.GetQueueRuntimePropertiesAsync(FirstQueueScope.QueueName, CancellationToken.None); Assert.AreEqual(ExpectedRemainingMessages, properties.ActiveMessageCount); + + var provider = _host.Services.GetService(); + Assert.AreEqual(0, provider.ClientCache.Count); + Assert.AreEqual(0, provider.MessageReceiverCache.Count); + Assert.AreEqual(0, provider.MessageSenderCache.Count); + Assert.AreEqual(0, provider.ActionsCache.Count); } private static bool IsError(LogMessage logMessage)