Skip to content

Commit

Permalink
[Event Hubs Client] Track Two (Timer Dispose Race Condition Fix)
Browse files Browse the repository at this point in the history
The focus of these changes is to address a race condition that could
cause the authorization refresh timer callback to throw an ObjectDisposedException
due to a set of benign race conditions.  These changes attempt to make those
conditions less likely but cannot elimitate them completely without introducing
synchronization overhead.  Because the condition is benign, accept the
resulting exception as an expected case rather than paying that cost.
  • Loading branch information
jsquire committed Dec 18, 2019
1 parent 4ac370a commit 36093bc
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ protected virtual async Task<ReceivingAmqpLink> CreateReceivingLinkAsync(AmqpCon
endpoint.AbsoluteUri,
authClaims,
AuthorizationRefreshTimeout,
() => refreshTimer
() => (ActiveLinks.ContainsKey(link) ? refreshTimer : null)
);

refreshTimer = new Timer(refreshHandler, null, CalculateLinkAuthorizationRefreshInterval(authExpirationUtc), Timeout.InfiniteTimeSpan);
Expand Down Expand Up @@ -754,6 +754,11 @@ protected virtual TimerCallback CreateAuthorizationRefreshHandler(AmqpConnection
try
{
if (refreshTimer == null)
{
return;
}
var authExpirationUtc = await RequestAuthorizationUsingCbsAsync(connection, tokenProvider, endpoint, audience, resource, requiredClaims, refreshTimeout).ConfigureAwait(false);
// Reset the timer for the next refresh.
Expand All @@ -763,6 +768,12 @@ protected virtual TimerCallback CreateAuthorizationRefreshHandler(AmqpConnection
refreshTimer.Change(CalculateLinkAuthorizationRefreshInterval(authExpirationUtc), Timeout.InfiniteTimeSpan);
}
}
catch (ObjectDisposedException)
{
// This can occur if the connection is closed or the scope disposed after the factory
// is called but before the timer is updated. The callback may also fire while the timer is
// in the act of disposing. Do not consider it an error.
}
catch (Exception ex)
{
EventHubsEventSource.Log.AmqpLinkAuthorizationRefreshError(EventHubName, endpoint.AbsoluteUri, ex.Message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1372,6 +1372,70 @@ public async Task OpenProducerLinkAsyncRefreshesAuthorization()
}
}

/// <summary>
/// Verifies functionality of the <see cref="AmqpConnectionScope.OpenProducerLinkAsync" />
/// method.
/// </summary>
///
[Test]
public async Task AuthorizationTimerCallbackToleratesDisposal()
{
var endpoint = new Uri("amqp://test.service.gov");
var eventHub = "myHub";
var credential = new Mock<EventHubTokenCredential>(Mock.Of<TokenCredential>(), "{namespace}.servicebus.windows.net");
var transport = EventHubsTransportType.AmqpTcp;
var cancellationSource = new CancellationTokenSource();
var mockConnection = new AmqpConnection(new MockTransport(), CreateMockAmqpSettings(), new AmqpConnectionSettings());
var mockSession = new AmqpSession(mockConnection, new AmqpSessionSettings(), Mock.Of<ILinkFactory>());
var mockScope = new DisposeOnAuthorizationTimerCallbackMockScope(endpoint, eventHub, credential.Object, transport, null);

var link = await mockScope.OpenProducerLinkAsync(null, TimeSpan.FromDays(1), cancellationSource.Token);
Assert.That(link, Is.Not.Null, "The link produced was null");

var activeLinks = GetActiveLinks(mockScope);
Assert.That(activeLinks.ContainsKey(link), Is.True, "The producer link should be tracked as active.");

activeLinks.TryGetValue(link, out var refreshTimer);
Assert.That(refreshTimer, Is.Not.Null, "The link should have a non-null timer.");

// Reset the timer so that it fires immediately and validate that authorization was
// requested. Since opening of the link requests an initial authorization and the expiration
// was set way in the future, there should be exactly two calls.
//
// Because the timer runs in the background, there is a level of non-determinism in when that
// callback will execute. Allow for a small number of delay and retries to account for it.

refreshTimer.Change(0, Timeout.Infinite);

var attemptCount = 0;
var remainingAttempts = 10;
var success = false;

while ((--remainingAttempts >= 0) && (!success))
{
try
{
await Task.Delay(250 * ++attemptCount).ConfigureAwait(false);
success = ((mockScope.IsDisposed) && (mockScope.CallbackInvoked));
}
catch (ObjectDisposedException)
{
Assert.Fail("No disposed exception should have been triggered");
}
catch when (remainingAttempts <= 0)
{
throw;
}
catch
{
// No action needed.
}

Assert.That(mockScope.IsDisposed, Is.True, "The scope should have been disposed.");
Assert.That(mockScope.CallbackInvoked, Is.True, "The authorization timer callback should have been invoked.");
}
}

/// <summary>
/// Verifies functionality of the <see cref="AmqpConnectionScope.Dispose" />
/// method.
Expand Down Expand Up @@ -1666,5 +1730,63 @@ public MockTransport() : base("Mock") { }
protected override void AbortInternal() => throw new NotImplementedException();
protected override bool CloseInternal() => throw new NotImplementedException();
}

/// <summary>
/// Provides a mock which disposes the scope before invoking the default timer callback for authorization.
/// </summary>
///
private class DisposeOnAuthorizationTimerCallbackMockScope : AmqpConnectionScope
{
public bool CallbackInvoked = false;

private AmqpConnection _mockConnection;
private AmqpSession _mockSession;

public DisposeOnAuthorizationTimerCallbackMockScope(Uri serviceEndpoint,
string eventHubName,
EventHubTokenCredential credential,
EventHubsTransportType transport,
IWebProxy proxy) : base(serviceEndpoint, eventHubName, credential, transport, proxy)
{
_mockConnection = new AmqpConnection(new MockTransport(), CreateMockAmqpSettings(), new AmqpConnectionSettings());
_mockSession = new AmqpSession(_mockConnection, new AmqpSessionSettings(), Mock.Of<ILinkFactory>());
}

protected override Task<AmqpConnection> CreateAndOpenConnectionAsync(Version amqpVersion,
Uri serviceEndpoint,
EventHubsTransportType transportType,
IWebProxy proxy,
string scopeIdentifier,
TimeSpan timeout) => Task.FromResult(_mockConnection);

protected override Task OpenAmqpObjectAsync(AmqpObject target, TimeSpan timeout) => Task.CompletedTask;

protected override TimerCallback CreateAuthorizationRefreshHandler(AmqpConnection connection,
AmqpObject amqpLink,
CbsTokenProvider tokenProvider,
Uri endpoint,
string audience,
string resource,
string[] requiredClaims,
TimeSpan refreshTimeout,
Func<Timer> refreshTimerFactory)
{
Action baseImplementation = () => base.CreateAuthorizationRefreshHandler(connection, amqpLink, tokenProvider, endpoint, audience, resource, requiredClaims, refreshTimeout, refreshTimerFactory);

return state =>
{
CallbackInvoked = true;
Dispose();
baseImplementation();
};
}
protected override Task<DateTime> RequestAuthorizationUsingCbsAsync(AmqpConnection connection,
CbsTokenProvider tokenProvider,
Uri endpoint,
string audience,
string resource,
string[] requiredClaims,
TimeSpan timeout) => Task.FromResult(DateTime.Now.AddMinutes(60));
}
}
}

0 comments on commit 36093bc

Please sign in to comment.