diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpConnectionScope.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpConnectionScope.cs index d15dd063f6b7..ac83a8ada25c 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpConnectionScope.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpConnectionScope.cs @@ -543,7 +543,7 @@ protected virtual async Task CreateReceivingLinkAsync(AmqpCon endpoint.AbsoluteUri, authClaims, AuthorizationRefreshTimeout, - () => refreshTimer + () => (ActiveLinks.ContainsKey(link) ? refreshTimer : null) ); refreshTimer = new Timer(refreshHandler, null, CalculateLinkAuthorizationRefreshInterval(authExpirationUtc), Timeout.InfiniteTimeSpan); @@ -754,6 +754,11 @@ protected virtual TimerCallback CreateAuthorizationRefreshHandler(AmqpConnection try { + if (refreshTimer == null) + { + return; + } + var authExpirationUtc = await RequestAuthorizationUsingCbsAsync(connection, tokenProvider, endpoint, audience, resource, requiredClaims, refreshTimeout).ConfigureAwait(false); // Reset the timer for the next refresh. @@ -763,6 +768,12 @@ protected virtual TimerCallback CreateAuthorizationRefreshHandler(AmqpConnection refreshTimer.Change(CalculateLinkAuthorizationRefreshInterval(authExpirationUtc), Timeout.InfiniteTimeSpan); } } + catch (ObjectDisposedException) + { + // This can occur if the connection is closed or the scope disposed after the factory + // is called but before the timer is updated. The callback may also fire while the timer is + // in the act of disposing. Do not consider it an error. + } catch (Exception ex) { EventHubsEventSource.Log.AmqpLinkAuthorizationRefreshError(EventHubName, endpoint.AbsoluteUri, ex.Message); diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpConnectionScopeTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpConnectionScopeTests.cs index 6821bb564898..d2989ccf2013 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpConnectionScopeTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpConnectionScopeTests.cs @@ -1372,6 +1372,70 @@ public async Task OpenProducerLinkAsyncRefreshesAuthorization() } } + /// + /// Verifies functionality of the + /// method. + /// + /// + [Test] + public async Task AuthorizationTimerCallbackToleratesDisposal() + { + var endpoint = new Uri("amqp://test.service.gov"); + var eventHub = "myHub"; + var credential = new Mock(Mock.Of(), "{namespace}.servicebus.windows.net"); + var transport = EventHubsTransportType.AmqpTcp; + var cancellationSource = new CancellationTokenSource(); + var mockConnection = new AmqpConnection(new MockTransport(), CreateMockAmqpSettings(), new AmqpConnectionSettings()); + var mockSession = new AmqpSession(mockConnection, new AmqpSessionSettings(), Mock.Of()); + var mockScope = new DisposeOnAuthorizationTimerCallbackMockScope(endpoint, eventHub, credential.Object, transport, null); + + var link = await mockScope.OpenProducerLinkAsync(null, TimeSpan.FromDays(1), cancellationSource.Token); + Assert.That(link, Is.Not.Null, "The link produced was null"); + + var activeLinks = GetActiveLinks(mockScope); + Assert.That(activeLinks.ContainsKey(link), Is.True, "The producer link should be tracked as active."); + + activeLinks.TryGetValue(link, out var refreshTimer); + Assert.That(refreshTimer, Is.Not.Null, "The link should have a non-null timer."); + + // Reset the timer so that it fires immediately and validate that authorization was + // requested. Since opening of the link requests an initial authorization and the expiration + // was set way in the future, there should be exactly two calls. + // + // Because the timer runs in the background, there is a level of non-determinism in when that + // callback will execute. Allow for a small number of delay and retries to account for it. + + refreshTimer.Change(0, Timeout.Infinite); + + var attemptCount = 0; + var remainingAttempts = 10; + var success = false; + + while ((--remainingAttempts >= 0) && (!success)) + { + try + { + await Task.Delay(250 * ++attemptCount).ConfigureAwait(false); + success = ((mockScope.IsDisposed) && (mockScope.CallbackInvoked)); + } + catch (ObjectDisposedException) + { + Assert.Fail("No disposed exception should have been triggered"); + } + catch when (remainingAttempts <= 0) + { + throw; + } + catch + { + // No action needed. + } + + Assert.That(mockScope.IsDisposed, Is.True, "The scope should have been disposed."); + Assert.That(mockScope.CallbackInvoked, Is.True, "The authorization timer callback should have been invoked."); + } + } + /// /// Verifies functionality of the /// method. @@ -1666,5 +1730,63 @@ public MockTransport() : base("Mock") { } protected override void AbortInternal() => throw new NotImplementedException(); protected override bool CloseInternal() => throw new NotImplementedException(); } + + /// + /// Provides a mock which disposes the scope before invoking the default timer callback for authorization. + /// + /// + private class DisposeOnAuthorizationTimerCallbackMockScope : AmqpConnectionScope + { + public bool CallbackInvoked = false; + + private AmqpConnection _mockConnection; + private AmqpSession _mockSession; + + public DisposeOnAuthorizationTimerCallbackMockScope(Uri serviceEndpoint, + string eventHubName, + EventHubTokenCredential credential, + EventHubsTransportType transport, + IWebProxy proxy) : base(serviceEndpoint, eventHubName, credential, transport, proxy) + { + _mockConnection = new AmqpConnection(new MockTransport(), CreateMockAmqpSettings(), new AmqpConnectionSettings()); + _mockSession = new AmqpSession(_mockConnection, new AmqpSessionSettings(), Mock.Of()); + } + + protected override Task CreateAndOpenConnectionAsync(Version amqpVersion, + Uri serviceEndpoint, + EventHubsTransportType transportType, + IWebProxy proxy, + string scopeIdentifier, + TimeSpan timeout) => Task.FromResult(_mockConnection); + + protected override Task OpenAmqpObjectAsync(AmqpObject target, TimeSpan timeout) => Task.CompletedTask; + + protected override TimerCallback CreateAuthorizationRefreshHandler(AmqpConnection connection, + AmqpObject amqpLink, + CbsTokenProvider tokenProvider, + Uri endpoint, + string audience, + string resource, + string[] requiredClaims, + TimeSpan refreshTimeout, + Func refreshTimerFactory) + { + Action baseImplementation = () => base.CreateAuthorizationRefreshHandler(connection, amqpLink, tokenProvider, endpoint, audience, resource, requiredClaims, refreshTimeout, refreshTimerFactory); + + return state => + { + CallbackInvoked = true; + Dispose(); + baseImplementation(); + }; + } + protected override Task RequestAuthorizationUsingCbsAsync(AmqpConnection connection, + CbsTokenProvider tokenProvider, + Uri endpoint, + string audience, + string resource, + string[] requiredClaims, + TimeSpan timeout) => Task.FromResult(DateTime.Now.AddMinutes(60)); + } } }