Skip to content

Commit

Permalink
Dispose clients on host shutdown rather than listener dispose (Azure#…
Browse files Browse the repository at this point in the history
…39225)

* Dispose clients on host shutdown rather than listener dispose

* Remove project reference

* Update version - for some reason the auto-update job failed
  • Loading branch information
JoshLove-msft authored and matthohn-msft committed Oct 27, 2023
1 parent bd9f75a commit aef8687
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# Release History

## 5.14.0-beta.1 (Unreleased)

### Features Added

### Breaking Changes

### Bugs Fixed

### Other Changes

## 5.13.0 (2023-10-11)

### Features Added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public static IWebJobsBuilder AddServiceBus(this IWebJobsBuilder builder, Action

builder.Services.AddAzureClientsCore();
builder.Services.TryAddSingleton<MessagingProvider>();
builder.Services.AddHostedService<CachedClientCleanupService>();
builder.Services.AddSingleton<ServiceBusClientFactory>();
#if NET6_0_OR_GREATER
builder.Services.AddSingleton<SettlementService>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,12 +305,8 @@ public void Dispose()
_stoppingCancellationTokenSource.Cancel();
_functionExecutionCancellationTokenSource.Cancel();

if (_batchReceiver.IsValueCreated)
{
#pragma warning disable AZC0102 // Do not use GetAwaiter().GetResult().
_batchReceiver.Value.CloseAsync(CancellationToken.None).GetAwaiter().GetResult();
#pragma warning restore AZC0102 // Do not use GetAwaiter().GetResult().
}
// ServiceBusClient and receivers will be disposed in CachedClientCleanupService, so as not to dispose it while it's still in
// use by other listeners. Processors are disposed here as they cannot be shared amongst listeners.

if (_messageProcessor.IsValueCreated)
{
Expand All @@ -326,13 +322,6 @@ public void Dispose()
#pragma warning restore AZC0102 // Do not use GetAwaiter().GetResult().
}

if (_client.IsValueCreated)
{
#pragma warning disable AZC0102 // Do not use GetAwaiter().GetResult().
_client.Value.DisposeAsync().AsTask().GetAwaiter().GetResult();
#pragma warning restore AZC0102 // Do not use GetAwaiter().GetResult().
}

_stopAsyncSemaphore.Dispose();
_stoppingCancellationTokenSource.Dispose();
_concurrencyUpdateManager?.Dispose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
<PropertyGroup>
<TargetFrameworks>netstandard2.0;net6.0</TargetFrameworks>
<Description>Microsoft Azure WebJobs SDK ServiceBus Extension</Description>
<Version>5.13.0</Version>
<Version>5.14.0-beta.1</Version>
<!--The ApiCompatVersion is managed automatically and should not generally be modified manually.-->
<!--Since we are adding a new target for net6.0, we need to temporarily condition on netstandard-->
<ApiCompatVersion Condition="'$(TargetFramework)' == 'netstandard2.0'">5.12.0</ApiCompatVersion>
<ApiCompatVersion>5.13.0</ApiCompatVersion>
<NoWarn>$(NoWarn);AZC0001;CS1591;SA1636;AZC0007;AZC0015</NoWarn>
<SignAssembly>true</SignAssembly>
<IsExtensionClientLibrary>true</IsExtensionClientLibrary>
Expand Down Expand Up @@ -48,10 +48,4 @@
<Compile Include="$(AzureCoreSharedSources)MessagingClientDiagnostics.cs" LinkBase="Shared" />
<Compile Include="$(AzureCoreSharedSources)MessagingDiagnosticOperation.cs" LinkBase="Shared" />
</ItemGroup>

<!--Temp - remove after next release-->
<ItemGroup>
<ProjectReference Include="..\..\Azure.Messaging.ServiceBus\src\Azure.Messaging.ServiceBus.csproj" />
</ItemGroup>
<!--End Temp-->
</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;

namespace Microsoft.Azure.WebJobs.ServiceBus
{
internal class CachedClientCleanupService : IHostedService
{
private readonly MessagingProvider _provider;

public CachedClientCleanupService(MessagingProvider provider)
{
_provider = provider;
}

public Task StartAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}

public async Task StopAsync(CancellationToken cancellationToken)
{
foreach (var receiver in _provider.MessageReceiverCache.Values)
{
await receiver.DisposeAsync().ConfigureAwait(false);
}
_provider.MessageReceiverCache.Clear();

foreach (var sender in _provider.MessageSenderCache.Values)
{
await sender.DisposeAsync().ConfigureAwait(false);
}
_provider.MessageSenderCache.Clear();

foreach (var client in _provider.ClientCache.Values)
{
await client.DisposeAsync().ConfigureAwait(false);
}
_provider.ClientCache.Clear();
_provider.ActionsCache.Clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ public class MessagingProvider
{
internal ServiceBusOptions Options { get; }

private readonly ConcurrentDictionary<string, ServiceBusSender> _messageSenderCache = new();
private readonly ConcurrentDictionary<string, ServiceBusReceiver> _messageReceiverCache = new();
private readonly ConcurrentDictionary<string, ServiceBusClient> _clientCache = new();
internal ConcurrentDictionary<string, ServiceBusSender> MessageSenderCache { get; } = new();
internal ConcurrentDictionary<string, ServiceBusReceiver> MessageReceiverCache { get; } = new();
internal ConcurrentDictionary<string, ServiceBusClient> ClientCache { get; } = new();
internal ConcurrentDictionary<string, (ServiceBusReceivedMessage Message, ServiceBusMessageActions Actions)> ActionsCache { get; } = new();

/// <summary>
Expand Down Expand Up @@ -49,7 +49,7 @@ protected internal virtual ServiceBusClient CreateClient(string connectionString
Argument.AssertNotNullOrEmpty(connectionString, nameof(connectionString));
Argument.AssertNotNull(options, nameof(options));

return _clientCache.GetOrAdd(
return ClientCache.GetOrAdd(
connectionString,
(_) => new ServiceBusClient(connectionString, options));
}
Expand All @@ -71,7 +71,7 @@ protected internal virtual ServiceBusClient CreateClient(string fullyQualifiedNa
Argument.AssertNotNull(credential, nameof(credential));
Argument.AssertNotNull(options, nameof(options));

return _clientCache.GetOrAdd(
return ClientCache.GetOrAdd(
fullyQualifiedNamespace,
(_) => new ServiceBusClient(fullyQualifiedNamespace, credential, options));
}
Expand Down Expand Up @@ -140,7 +140,7 @@ protected internal virtual ServiceBusSender CreateMessageSender(ServiceBusClient
Argument.AssertNotNull(client, nameof(client));
Argument.AssertNotNullOrEmpty(entityPath, nameof(entityPath));

return _messageSenderCache.GetOrAdd(GenerateCacheKey(client.FullyQualifiedNamespace, entityPath), client.CreateSender(entityPath));
return MessageSenderCache.GetOrAdd(GenerateCacheKey(client.FullyQualifiedNamespace, entityPath), client.CreateSender(entityPath));
}

/// <summary>
Expand All @@ -159,7 +159,7 @@ protected internal virtual ServiceBusReceiver CreateBatchMessageReceiver(Service
Argument.AssertNotNullOrEmpty(entityPath, nameof(entityPath));
Argument.AssertNotNull(options, nameof(options));

return _messageReceiverCache.GetOrAdd(GenerateCacheKey(client.FullyQualifiedNamespace, entityPath), (_) => client.CreateReceiver(entityPath, options));
return MessageReceiverCache.GetOrAdd(GenerateCacheKey(client.FullyQualifiedNamespace, entityPath), (_) => client.CreateReceiver(entityPath, options));
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using Azure.Messaging.ServiceBus.Administration;
using Azure.Messaging.ServiceBus.Tests;
using Microsoft.Azure.WebJobs.Host.TestCommon;
using Microsoft.Azure.WebJobs.ServiceBus;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
Expand Down Expand Up @@ -335,6 +336,12 @@ public async Task StopAsync(CancellationToken cancellationToken)

QueueRuntimeProperties properties = await client.GetQueueRuntimePropertiesAsync(FirstQueueScope.QueueName, CancellationToken.None);
Assert.AreEqual(ExpectedRemainingMessages, properties.ActiveMessageCount);

var provider = _host.Services.GetService<MessagingProvider>();
Assert.AreEqual(0, provider.ClientCache.Count);
Assert.AreEqual(0, provider.MessageReceiverCache.Count);
Assert.AreEqual(0, provider.MessageSenderCache.Count);
Assert.AreEqual(0, provider.ActionsCache.Count);
}

private static bool IsError(LogMessage logMessage)
Expand Down

0 comments on commit aef8687

Please sign in to comment.