Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

MessageReceiver.RenewLockAsync() is not updating the LockedUntilUtc on the Message #245

Merged
merged 2 commits into from
Aug 3, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/Microsoft.Azure.ServiceBus/Core/IMessageReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace Microsoft.Azure.ServiceBus.Core
/// The MessageReceiver provides advanced functionality that is not found in the
/// <see cref="QueueClient" /> or <see cref="SubscriptionClient" />. For instance,
/// <see cref="ReceiveAsync()"/>, which allows you to receive messages on demand, but also requires
/// you to manually renew locks using <see cref="RenewLockAsync(string)"/>.
/// you to manually renew locks using <see cref="RenewLockAsync(Message)"/>.
/// </remarks>
/// <seealso cref="MessageReceiver"/>
/// <seealso cref="QueueClient"/>
Expand Down Expand Up @@ -113,17 +113,17 @@ public interface IMessageReceiver : IReceiverClient
Task DeferAsync(string lockToken);

/// <summary>
/// Renews the lock on the message specified by the lock token. The lock will be renewed based on the setting specified on the queue.
/// Renews the lock on the message. The lock will be renewed based on the setting specified on the queue.
/// </summary>
/// <param name="lockToken">The lock token of the <see cref="Message" />.</param>
/// <param name="message"> <see cref="Message" />.</param>
/// <remarks>
/// When a message is received in <see cref="ServiceBus.ReceiveMode.PeekLock"/> mode, the message is locked on the server for this
/// receiver instance for a duration as specified during the Queue/Subscription creation (LockDuration).
/// If processing of the message requires longer than this duration, the lock needs to be renewed. For each renewal, the lock is renewed by
/// the entity's LockDuration.
/// </remarks>
/// <returns>The asynchronous operation.</returns>
Task<DateTime> RenewLockAsync(string lockToken);
Task RenewLockAsync(Message message);

/// <summary>
/// Fetches the next active message without changing the state of the receiver or the message source.
Expand Down
12 changes: 5 additions & 7 deletions src/Microsoft.Azure.ServiceBus/Core/MessageReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ namespace Microsoft.Azure.ServiceBus.Core
/// The MessageReceiver provides advanced functionality that is not found in the
/// <see cref="QueueClient" /> or <see cref="SubscriptionClient" />. For instance,
/// <see cref="ReceiveAsync()"/>, which allows you to receive messages on demand, but also requires
/// you to manually renew locks using <see cref="RenewLockAsync(string)"/>.
/// you to manually renew locks using <see cref="RenewLockAsync(Message)"/>.
/// It uses AMQP protocol to communicate with service.
/// </remarks>
public class MessageReceiver : ClientEntity, IMessageReceiver
Expand Down Expand Up @@ -550,27 +550,26 @@ await this.RetryPolicy.RunOperation(
/// <summary>
/// Renews the lock on the message specified by the lock token. The lock will be renewed based on the setting specified on the queue.
/// </summary>
/// <param name="lockToken">The lock token of the <see cref="Message" />.</param>
/// <param name="message"> <see cref="Message" />.</param>
/// <remarks>
/// When a message is received in <see cref="ServiceBus.ReceiveMode.PeekLock"/> mode, the message is locked on the server for this
/// receiver instance for a duration as specified during the Queue/Subscription creation (LockDuration).
/// If processing of the message requires longer than this duration, the lock needs to be renewed. For each renewal, the lock is renewed by
/// the entity's LockDuration.
/// </remarks>
/// <returns>The asynchronous operation.</returns>
public async Task<DateTime> RenewLockAsync(string lockToken)
public async Task RenewLockAsync(Message message)
{
this.ThrowIfNotPeekLockMode();

MessagingEventSource.Log.MessageRenewLockStart(this.ClientId, 1, lockToken);
MessagingEventSource.Log.MessageRenewLockStart(this.ClientId, 1, message.SystemProperties.LockToken);

DateTime lockedUntilUtc = DateTime.Now;
try
{
await this.RetryPolicy.RunOperation(
async () =>
{
lockedUntilUtc = await this.OnRenewLockAsync(lockToken).ConfigureAwait(false);
message.SystemProperties.LockedUntilUtc = await this.OnRenewLockAsync(message.SystemProperties.LockToken).ConfigureAwait(false);
}, this.OperationTimeout)
.ConfigureAwait(false);
}
Expand All @@ -581,7 +580,6 @@ await this.RetryPolicy.RunOperation(
}

MessagingEventSource.Log.MessageRenewLockStop(this.ClientId);
return lockedUntilUtc;
}

/// <summary>
Expand Down
2 changes: 1 addition & 1 deletion src/Microsoft.Azure.ServiceBus/MessageReceivePump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ async Task RenewMessageLockTask(Message message, CancellationToken renewLockCanc
if (!this.pumpCancellationToken.IsCancellationRequested &&
!renewLockCancellationToken.IsCancellationRequested)
{
await this.messageReceiver.RenewLockAsync(message.SystemProperties.LockToken).ConfigureAwait(false);
await this.messageReceiver.RenewLockAsync(message).ConfigureAwait(false);
MessagingEventSource.Log.MessageReceiverPumpRenewMessageStop(this.messageReceiver.ClientId, message);
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ namespace Microsoft.Azure.ServiceBus.Core
System.Threading.Tasks.Task<System.Collections.Generic.IList<Microsoft.Azure.ServiceBus.Message>> ReceiveAsync(int maxMessageCount, System.TimeSpan operationTimeout);
System.Threading.Tasks.Task<Microsoft.Azure.ServiceBus.Message> ReceiveDeferredMessageAsync(long sequenceNumber);
System.Threading.Tasks.Task<System.Collections.Generic.IList<Microsoft.Azure.ServiceBus.Message>> ReceiveDeferredMessageAsync(System.Collections.Generic.IEnumerable<long> sequenceNumbers);
System.Threading.Tasks.Task<System.DateTime> RenewLockAsync(string lockToken);
System.Threading.Tasks.Task RenewLockAsync(Microsoft.Azure.ServiceBus.Message message);
}
public interface IMessageSender : Microsoft.Azure.ServiceBus.Core.ISenderClient, Microsoft.Azure.ServiceBus.IClientEntity { }
public interface IReceiverClient : Microsoft.Azure.ServiceBus.IClientEntity
Expand Down Expand Up @@ -429,7 +429,7 @@ namespace Microsoft.Azure.ServiceBus.Core
public void RegisterMessageHandler(System.Func<Microsoft.Azure.ServiceBus.Message, System.Threading.CancellationToken, System.Threading.Tasks.Task> handler, System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task> exceptionReceivedHandler) { }
public void RegisterMessageHandler(System.Func<Microsoft.Azure.ServiceBus.Message, System.Threading.CancellationToken, System.Threading.Tasks.Task> handler, Microsoft.Azure.ServiceBus.MessageHandlerOptions messageHandlerOptions) { }
public override void RegisterPlugin(Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin serviceBusPlugin) { }
public System.Threading.Tasks.Task<System.DateTime> RenewLockAsync(string lockToken) { }
public System.Threading.Tasks.Task RenewLockAsync(Microsoft.Azure.ServiceBus.Message message) { }
public override void UnregisterPlugin(string serviceBusPluginName) { }
}
public class MessageSender : Microsoft.Azure.ServiceBus.ClientEntity, Microsoft.Azure.ServiceBus.Core.IMessageSender, Microsoft.Azure.ServiceBus.Core.ISenderClient, Microsoft.Azure.ServiceBus.IClientEntity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,16 +126,17 @@ internal async Task RenewLockTestCase(IMessageSender messageSender, IMessageRece
TestUtility.Log("Sleeping 10 seconds...");
await Task.Delay(TimeSpan.FromSeconds(10));

DateTime lockedUntilUtcTime = await messageReceiver.RenewLockAsync(receivedMessages.First().SystemProperties.LockToken);
TestUtility.Log($"After First Renewal: {lockedUntilUtcTime}");
Assert.True(lockedUntilUtcTime >= firstLockedUntilUtcTime + TimeSpan.FromSeconds(10));

await messageReceiver.RenewLockAsync(message);
TestUtility.Log($"After First Renewal: {message.SystemProperties.LockedUntilUtc}");
Assert.True(message.SystemProperties.LockedUntilUtc >= firstLockedUntilUtcTime + TimeSpan.FromSeconds(10));

TestUtility.Log("Sleeping 5 seconds...");
await Task.Delay(TimeSpan.FromSeconds(5));

lockedUntilUtcTime = await messageReceiver.RenewLockAsync(receivedMessages.First().SystemProperties.LockToken);
TestUtility.Log($"After Second Renewal: {lockedUntilUtcTime}");
Assert.True(lockedUntilUtcTime >= firstLockedUntilUtcTime + TimeSpan.FromSeconds(5));
await messageReceiver.RenewLockAsync(message);
TestUtility.Log($"After Second Renewal: {message.SystemProperties.LockedUntilUtc}");
Assert.True(message.SystemProperties.LockedUntilUtc >= firstLockedUntilUtcTime + TimeSpan.FromSeconds(5));

// Complete Messages
await TestUtility.CompleteMessagesAsync(messageReceiver, receivedMessages);
Expand Down