Skip to content

Commit

Permalink
Logging clean up (Azure#12222)
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLove-msft authored May 21, 2020
1 parent 0056ab8 commit f235f94
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,6 @@ public virtual async Task<RequestResponseAmqpLink> OpenManagementLinkAsync(
/// <param name="receiveMode">The <see cref="ReceiveMode"/> used to specify how messages are received. Defaults to PeekLock mode.</param>
/// <param name="sessionId"></param>
/// <param name="isSessionReceiver"></param>
/// <param name="identifier">The identifier for the receive link.</param>
/// <param name="timeout">The timeout to apply when creating the link.</param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
Expand All @@ -284,44 +283,34 @@ public virtual async Task<ReceivingAmqpLink> OpenReceiverLinkAsync(
ReceiveMode receiveMode,
string sessionId,
bool isSessionReceiver,
string identifier,
CancellationToken cancellationToken)
{
ServiceBusEventSource.Log.CreateReceiveLinkStart(identifier);
try
{
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

var stopWatch = ValueStopwatch.StartNew();
var receiverEndpoint = new Uri(ServiceEndpoint, entityPath);
var stopWatch = ValueStopwatch.StartNew();
var receiverEndpoint = new Uri(ServiceEndpoint, entityPath);

var connection = await ActiveConnection.GetOrCreateAsync(timeout).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
var connection = await ActiveConnection.GetOrCreateAsync(timeout).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

ReceivingAmqpLink link = await CreateReceivingLinkAsync(
entityPath,
connection,
receiverEndpoint,
timeout.CalculateRemaining(stopWatch.GetElapsedTime()),
prefetchCount,
receiveMode,
sessionId,
isSessionReceiver,
cancellationToken
).ConfigureAwait(false);
ReceivingAmqpLink link = await CreateReceivingLinkAsync(
entityPath,
connection,
receiverEndpoint,
timeout.CalculateRemaining(stopWatch.GetElapsedTime()),
prefetchCount,
receiveMode,
sessionId,
isSessionReceiver,
cancellationToken
).ConfigureAwait(false);

cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

await OpenAmqpObjectAsync(link, timeout.CalculateRemaining(stopWatch.GetElapsedTime())).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
return link;

await OpenAmqpObjectAsync(link, timeout.CalculateRemaining(stopWatch.GetElapsedTime())).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
ServiceBusEventSource.Log.CreateReceiveLinkComplete(identifier);
return link;
}
catch (Exception ex)
{
ServiceBusEventSource.Log.CreateReceiveLinkException(identifier, ex.ToString());
throw;
}
}

/// <summary>
Expand Down Expand Up @@ -910,28 +899,30 @@ protected virtual TimerCallback CreateAuthorizationRefreshHandler(
refreshTimeout)
.ConfigureAwait(false);
// Reset the timer for the next refresh.
// Reset the timer for the next refresh.
if (authExpirationUtc >= DateTimeOffset.UtcNow)
if (authExpirationUtc >= DateTimeOffset.UtcNow)
{
refreshTimer.Change(CalculateLinkAuthorizationRefreshInterval(authExpirationUtc), Timeout.InfiniteTimeSpan);
}
}
catch (ObjectDisposedException)
{
// This can occur if the connection is closed or the scope disposed after the factory
// is called but before the timer is updated. The callback may also fire while the timer is
// in the act of disposing. Do not consider it an error.
}
// This can occur if the connection is closed or the scope disposed after the factory
// is called but before the timer is updated. The callback may also fire while the timer is
// in the act of disposing. Do not consider it an error.
}
catch (Exception ex)
{
ServiceBusEventSource.Log.AmqpLinkAuthorizationRefreshError(entityPath, endpoint.AbsoluteUri, ex.Message);
// Attempt to unset the timer; there's a decent chance that it has been disposed at this point or
// that the connection has been closed. Ignore potential exceptions, as they won't impact operation.
// At worse, another timer tick will occur and the operation will be retried.
// Attempt to unset the timer; there's a decent chance that it has been disposed at this point or
// that the connection has been closed. Ignore potential exceptions, as they won't impact operation.
// At worse, another timer tick will occur and the operation will be retried.
try { refreshTimer.Change(Timeout.Infinite, Timeout.Infinite); } catch {}
try
{ refreshTimer.Change(Timeout.Infinite, Timeout.Infinite); }
catch { }
}
finally
{
Expand Down
36 changes: 31 additions & 5 deletions sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,12 @@ public AmqpReceiver(

_receiveLink = new FaultTolerantAmqpObject<ReceivingAmqpLink>(
timeout =>
_connectionScope.OpenReceiverLinkAsync(
entityPath: _entityPath,
OpenReceiverLinkAsync(
timeout: timeout,
prefetchCount: prefetchCount,
receiveMode: receiveMode,
sessionId: sessionId,
isSessionReceiver: isSessionReceiver,
identifier: _identifier,
cancellationToken: CancellationToken.None),
isSessionReceiver: isSessionReceiver),
link => CloseLink(link));

_managementLink = new FaultTolerantAmqpObject<RequestResponseAmqpLink>(
Expand All @@ -155,6 +152,35 @@ public AmqpReceiver(
link => CloseLink(link));
}

private async Task<ReceivingAmqpLink> OpenReceiverLinkAsync(
TimeSpan timeout,
uint prefetchCount,
ReceiveMode receiveMode,
string sessionId,
bool isSessionReceiver)
{
ServiceBusEventSource.Log.CreateReceiveLinkStart(_identifier);

try
{
ReceivingAmqpLink link = await _connectionScope.OpenReceiverLinkAsync(
entityPath: _entityPath,
timeout: timeout,
prefetchCount: prefetchCount,
receiveMode: receiveMode,
sessionId: sessionId,
isSessionReceiver: isSessionReceiver,
cancellationToken: CancellationToken.None).ConfigureAwait(false);
ServiceBusEventSource.Log.CreateReceiveLinkComplete(_identifier);
return link;
}
catch (Exception ex)
{
ServiceBusEventSource.Log.CreateReceiveLinkException(_identifier, ex.ToString());
throw;
}
}

private void CloseLink(ReceivingAmqpLink link)
{
link.Session?.SafeClose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ internal ServiceBusEventSource() { }
internal const int AmqpLinkRefreshStartEvent = 39;
internal const int AmqpLinkRefreshCompleteEvent = 40;
internal const int AmqpLinkRefreshExceptionEvent = 41;

internal const int ManagementSerializedExceptionEvent = 42;
internal const int RunOperationExceptionEvent = 43;

internal const int ClientDisposeStartEvent = 44;
Expand Down Expand Up @@ -170,8 +172,6 @@ internal ServiceBusEventSource() { }

internal const int ProcessorErrorHandlerThrewExceptionEvent = 94;
internal const int ScheduleTaskFailedEvent = 95;
internal const int ManagementSerializedExceptionEvent = 96;


#endregion
// add new event numbers here incrementing from previous
Expand Down Expand Up @@ -724,8 +724,6 @@ public void ProcessorErrorHandlerThrewException(string exception)
}
}



#endregion region

#region Rule management
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ public void ReceiveAsyncAppliesTheRetryPolicy(ServiceBusRetryOptions retryOption
It.IsAny<ReceiveMode>(),
It.IsAny<string>(),
It.IsAny<bool>(),
It.IsAny<string>(),
It.IsAny<CancellationToken>()))
.Throws(retriableException);

Expand All @@ -180,7 +179,6 @@ public void ReceiveAsyncAppliesTheRetryPolicy(ServiceBusRetryOptions retryOption
It.IsAny<ReceiveMode>(),
It.IsAny<string>(),
It.IsAny<bool>(),
It.IsAny<string>(),
It.IsAny<CancellationToken>()),
Times.Exactly(1 + retryOptions.MaxRetries));
}
Expand Down Expand Up @@ -218,7 +216,6 @@ public void ReceiveAsyncConsidersOperationCanceledExceptionAsRetriable(ServiceBu
It.IsAny<ReceiveMode>(),
It.IsAny<string>(),
It.IsAny<bool>(),
It.IsAny<string>(),
It.IsAny<CancellationToken>()))
.Throws(retriableException);

Expand All @@ -236,7 +233,6 @@ public void ReceiveAsyncConsidersOperationCanceledExceptionAsRetriable(ServiceBu
It.IsAny<ReceiveMode>(),
It.IsAny<string>(),
It.IsAny<bool>(),
It.IsAny<string>(),
It.IsAny<CancellationToken>()),
Times.Exactly(1 + retryOptions.MaxRetries));
}
Expand Down Expand Up @@ -282,7 +278,6 @@ public void ReceiveAsyncAppliesTheRetryPolicyForAmqpErrors(ServiceBusRetryOption
It.IsAny<ReceiveMode>(),
It.IsAny<string>(),
It.IsAny<bool>(),
It.IsAny<string>(),
It.IsAny<CancellationToken>()))
.Throws(retriableException);

Expand All @@ -300,7 +295,6 @@ public void ReceiveAsyncAppliesTheRetryPolicyForAmqpErrors(ServiceBusRetryOption
It.IsAny<ReceiveMode>(),
It.IsAny<string>(),
It.IsAny<bool>(),
It.IsAny<string>(),
It.IsAny<CancellationToken>()),
Times.Exactly(1 + retryOptions.MaxRetries));

Expand Down Expand Up @@ -342,7 +336,6 @@ public void ReceiveAsyncDetectsAnEmbeddedErrorForOperationCanceled()
It.IsAny<ReceiveMode>(),
It.IsAny<string>(),
It.IsAny<bool>(),
It.IsAny<string>(),
It.IsAny<CancellationToken>()))
.Throws(exception);

Expand All @@ -360,7 +353,6 @@ public void ReceiveAsyncDetectsAnEmbeddedErrorForOperationCanceled()
It.IsAny<ReceiveMode>(),
It.IsAny<string>(),
It.IsAny<bool>(),
It.IsAny<string>(),
It.IsAny<CancellationToken>()),
Times.Once());
}
Expand Down Expand Up @@ -398,7 +390,6 @@ public void ReceiveAsyncDetectsAnEmbeddedAmqpErrorForOperationCanceled()
It.IsAny<ReceiveMode>(),
It.IsAny<string>(),
It.IsAny<bool>(),
It.IsAny<string>(),
It.IsAny<CancellationToken>()))
.Throws(exception);

Expand All @@ -416,7 +407,6 @@ public void ReceiveAsyncDetectsAnEmbeddedAmqpErrorForOperationCanceled()
It.IsAny<ReceiveMode>(),
It.IsAny<string>(),
It.IsAny<bool>(),
It.IsAny<string>(),
It.IsAny<CancellationToken>()),
Times.Once());
}
Expand Down Expand Up @@ -453,7 +443,6 @@ public void ReceiveAsyncDoesntRetryOnTaskCanceled()
It.IsAny<ReceiveMode>(),
It.IsAny<string>(),
It.IsAny<bool>(),
It.IsAny<string>(),
It.IsAny<CancellationToken>()))
.Throws(exception);

Expand All @@ -471,7 +460,6 @@ public void ReceiveAsyncDoesntRetryOnTaskCanceled()
It.IsAny<ReceiveMode>(),
It.IsAny<string>(),
It.IsAny<bool>(),
It.IsAny<string>(),
It.IsAny<CancellationToken>()),
Times.Once());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ public async Task LogsTransactionEvents()
await sender.SendAsync(message);
ts.Complete();
}
// Adding delay since transaction Commit/Rollback is an asynchronous operation.
await Task.Delay(TimeSpan.FromSeconds(2));
_listener.SingleEventById(ServiceBusEventSource.TransactionDeclaredEvent);
_listener.SingleEventById(ServiceBusEventSource.TransactionDischargedEvent);
};
Expand Down

0 comments on commit f235f94

Please sign in to comment.