Skip to content

Commit

Permalink
[Storage] Queues - Failed message handler (#17001)
Browse files Browse the repository at this point in the history
In this PR:
- Ability to provide a handler to capture queue messages that are in invalid format.

Bonus:
- Ability to get parent queue service (like we did in #16437 )

Pending dependency: #18170
  • Loading branch information
kasobol-msft authored Jan 29, 2021
1 parent 21ebca7 commit 20c4c3f
Show file tree
Hide file tree
Showing 33 changed files with 3,191 additions and 36 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFrameworks>$(RequiredTargetFrameworks)</TargetFrameworks>
</PropertyGroup>
Expand All @@ -17,6 +17,10 @@
</Description>
<RootNamespace>Azure.Storage</RootNamespace>
</PropertyGroup>
<PropertyGroup>
<!-- Force a project reference until SyncAsyncEventHandler has shipped -->
<UseProjectReferenceToAzureClients>true</UseProjectReferenceToAzureClients>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Azure.Core" />
</ItemGroup>
Expand Down
14 changes: 14 additions & 0 deletions sdk/storage/Azure.Storage.Queues/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,20 @@ QueueClient queue = new QueueClient(accountUri, new DefaultAzureCredential());

Learn more about enabling Azure Active Directory for authentication with Azure Storage in [our documentation][storage_ad] and [our samples](#next-steps).

### Message encoding

This version of library does not encode message by default. V11 and prior versions as well as Azure Functions use base64-encoded messages by default.
Therefore it's recommended to use this feature for interop scenarios.

```C# Snippet:Azure_Storage_Queues_Samples_Sample03_MessageEncoding_ConfigureMessageEncodingAsync
QueueClientOptions queueClientOptions = new QueueClientOptions()
{
MessageEncoding = QueueMessageEncoding.Base64
};

QueueClient queueClient = new QueueClient(connectionString, queueName, queueClientOptions);
```

## Troubleshooting

All Azure Storage Queue service operations will throw a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ public QueueClient(System.Uri queueUri, Azure.Storage.StorageSharedKeyCredential
public virtual System.Uri GenerateSasUri(Azure.Storage.Sas.QueueSasPermissions permissions, System.DateTimeOffset expiresOn) { throw null; }
public virtual Azure.Response<System.Collections.Generic.IEnumerable<Azure.Storage.Queues.Models.QueueSignedIdentifier>> GetAccessPolicy(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response<System.Collections.Generic.IEnumerable<Azure.Storage.Queues.Models.QueueSignedIdentifier>>> GetAccessPolicyAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
protected internal virtual Azure.Storage.Queues.QueueServiceClient GetParentQueueServiceClientCore() { throw null; }
public virtual Azure.Response<Azure.Storage.Queues.Models.QueueProperties> GetProperties(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Storage.Queues.Models.QueueProperties>> GetPropertiesAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
protected virtual System.Threading.Tasks.Task OnMessageDecodingFailedAsync(Azure.Storage.Queues.Models.QueueMessage receivedMessage, Azure.Storage.Queues.Models.PeekedMessage peekedMessage, bool runSynchronously, System.Threading.CancellationToken cancellationToken) { throw null; }
public virtual Azure.Response<Azure.Storage.Queues.Models.PeekedMessage> PeekMessage(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Storage.Queues.Models.PeekedMessage>> PeekMessageAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual Azure.Response<Azure.Storage.Queues.Models.PeekedMessage[]> PeekMessages(int? maxMessages = default(int?), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
Expand Down Expand Up @@ -72,6 +74,7 @@ public QueueClientOptions(Azure.Storage.Queues.QueueClientOptions.ServiceVersion
public System.Uri GeoRedundantSecondaryUri { get { throw null; } set { } }
public Azure.Storage.Queues.QueueMessageEncoding MessageEncoding { get { throw null; } set { } }
public Azure.Storage.Queues.QueueClientOptions.ServiceVersion Version { get { throw null; } }
public event Azure.Core.SyncAsyncEventHandler<Azure.Storage.Queues.QueueMessageDecodingFailedEventArgs> MessageDecodingFailed { add { } remove { } }
public enum ServiceVersion
{
V2019_02_02 = 1,
Expand All @@ -82,6 +85,13 @@ public enum ServiceVersion
V2020_06_12 = 6,
}
}
public partial class QueueMessageDecodingFailedEventArgs : Azure.SyncAsyncEventArgs
{
public QueueMessageDecodingFailedEventArgs(Azure.Storage.Queues.QueueClient queueClient, Azure.Storage.Queues.Models.QueueMessage receivedMessage, Azure.Storage.Queues.Models.PeekedMessage peekedMessage, bool runSynchronously, System.Threading.CancellationToken cancellationToken) : base (default(bool), default(System.Threading.CancellationToken)) { }
public Azure.Storage.Queues.Models.PeekedMessage PeekedMessage { get { throw null; } }
public Azure.Storage.Queues.QueueClient Queue { get { throw null; } }
public Azure.Storage.Queues.Models.QueueMessage ReceivedMessage { get { throw null; } }
}
public enum QueueMessageEncoding
{
None = 0,
Expand Down Expand Up @@ -369,6 +379,7 @@ public SpecializedQueueClientOptions(Azure.Storage.Queues.QueueClientOptions.Ser
}
public static partial class SpecializedQueueExtensions
{
public static Azure.Storage.Queues.QueueServiceClient GetParentQueueServiceClient(this Azure.Storage.Queues.QueueClient client) { throw null; }
public static Azure.Storage.Queues.QueueClient WithClientSideEncryptionOptions(this Azure.Storage.Queues.QueueClient client, Azure.Storage.ClientSideEncryptionOptions clientSideEncryptionOptions) { throw null; }
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using Azure.Storage.Queues.Models;
using NUnit.Framework;

namespace Azure.Storage.Queues.Samples.Tests
{
public class Sample03_MessageEncoding : SampleTest
{
[Test]
public void ConfigureMessageEncodingAsync()
{
var connectionString = ConnectionString;
var queueName = "foo";
#region Snippet:Azure_Storage_Queues_Samples_Sample03_MessageEncoding_ConfigureMessageEncodingAsync

QueueClientOptions queueClientOptions = new QueueClientOptions()
{
MessageEncoding = QueueMessageEncoding.Base64
};

QueueClient queueClient = new QueueClient(connectionString, queueName, queueClientOptions);
#endregion
}

[Test]
public void MessageDecodingFailedHandlerAsync()
{
var connectionString = ConnectionString;
var queueName = "foo";
#region Snippet:Azure_Storage_Queues_Samples_Sample03_MessageEncoding_MessageDecodingFailedHandlerAsync

QueueClientOptions queueClientOptions = new QueueClientOptions()
{
MessageEncoding = QueueMessageEncoding.Base64
};

queueClientOptions.MessageDecodingFailed += async (QueueMessageDecodingFailedEventArgs args) =>
{
if (args.PeekedMessage != null)
{
Console.WriteLine($"Invalid message has been peeked, message id={args.PeekedMessage.MessageId} body={args.PeekedMessage.Body}");
}
else if (args.ReceivedMessage != null)
{
Console.WriteLine($"Invalid message has been received, message id={args.ReceivedMessage.MessageId} body={args.ReceivedMessage.Body}");
if (args.RunSynchronously)
{
args.Queue.DeleteMessage(args.ReceivedMessage.MessageId, args.ReceivedMessage.PopReceipt);
}
else
{
await args.Queue.DeleteMessageAsync(args.ReceivedMessage.MessageId, args.ReceivedMessage.PopReceipt);
}
}
};

QueueClient queueClient = new QueueClient(connectionString, queueName, queueClientOptions);
#endregion
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
<Compile Include="$(AzureCoreSharedSources)HttpMessageSanitizer.cs" Link="Shared\Core\%(RecursiveDir)\%(Filename)%(Extension)" />
<Compile Include="$(AzureCoreSharedSources)DiagnosticScopeFactory.cs" Link="Shared\Core\%(RecursiveDir)\%(Filename)%(Extension)" />
<Compile Include="$(AzureCoreSharedSources)TaskExtensions.cs" Link="Shared\Core\%(RecursiveDir)\%(Filename)%(Extension)" />
<Compile Include="$(AzureCoreSharedSources)SyncAsyncEventHandlerExtensions.cs" Link="Shared\Core\%(RecursiveDir)\%(Filename)%(Extension)" />
</ItemGroup>
<ItemGroup>
<Compile Include="$(AzureStorageSharedSources)ClientsideEncryption\*.cs" Link="Shared\ClientsideEncryption\%(RecursiveDir)\%(Filename)%(Extension)" />
Expand Down
Loading

0 comments on commit 20c4c3f

Please sign in to comment.