From e2545685f130c4a0891d08b333904f83b5d94e66 Mon Sep 17 00:00:00 2001 From: Luke Warren Date: Tue, 31 Jan 2023 00:21:58 +0200 Subject: [PATCH] Fix: Connection Limit Invalid for Queue/Topic Limits Greater than 100 (#73) * fix: connection string invalid for queue limit > 100 * fix: connection string invalid for topic limit > 100 * chore: move max per page constant to BaseHelper.cs --- PurpleExplorer/Helpers/BaseHelper.cs | 2 + PurpleExplorer/Helpers/ITopicHelper.cs | 3 - PurpleExplorer/Helpers/QueueHelper.cs | 81 +++++++++++++------ PurpleExplorer/Helpers/TopicHelper.cs | 106 +++++++++++++++++-------- 4 files changed, 132 insertions(+), 60 deletions(-) diff --git a/PurpleExplorer/Helpers/BaseHelper.cs b/PurpleExplorer/Helpers/BaseHelper.cs index 17fa0db..5cd1aef 100644 --- a/PurpleExplorer/Helpers/BaseHelper.cs +++ b/PurpleExplorer/Helpers/BaseHelper.cs @@ -9,6 +9,8 @@ namespace PurpleExplorer.Helpers; public abstract class BaseHelper { + protected const int MaxRequestItemsPerPage = 100; + protected ManagementClient GetManagementClient(ServiceBusConnectionString connectionString) { if (connectionString.UseManagedIdentity) diff --git a/PurpleExplorer/Helpers/ITopicHelper.cs b/PurpleExplorer/Helpers/ITopicHelper.cs index 0e7b6cf..7ca8b12 100644 --- a/PurpleExplorer/Helpers/ITopicHelper.cs +++ b/PurpleExplorer/Helpers/ITopicHelper.cs @@ -10,9 +10,6 @@ public interface ITopicHelper { public Task GetNamespaceInfo(ServiceBusConnectionString connectionString); public Task> GetTopicsAndSubscriptions(ServiceBusConnectionString connectionString); - public Task GetTopic(ServiceBusConnectionString connectionString, string topicPath, bool retrieveSubscriptions); - public Task> GetSubscriptions(ServiceBusConnectionString connectionString, string topicPath); - public Task GetSubscription(ServiceBusConnectionString connectionString, string topicPath, string subscriptionName); public Task> GetDlqMessages(ServiceBusConnectionString connectionString, string topic, string subscription); public Task> GetMessagesBySubscription(ServiceBusConnectionString connectionString, string topicName, string subscriptionName); public Task SendMessage(ServiceBusConnectionString connectionString, string topicPath, string content); diff --git a/PurpleExplorer/Helpers/QueueHelper.cs b/PurpleExplorer/Helpers/QueueHelper.cs index 1e674da..53d51f3 100644 --- a/PurpleExplorer/Helpers/QueueHelper.cs +++ b/PurpleExplorer/Helpers/QueueHelper.cs @@ -5,6 +5,7 @@ using System.Threading.Tasks; using Microsoft.Azure.ServiceBus; using Microsoft.Azure.ServiceBus.Core; +using Microsoft.Azure.ServiceBus.Management; using PurpleExplorer.Models; using Message = PurpleExplorer.Models.Message; using AzureMessage = Microsoft.Azure.ServiceBus.Message; @@ -22,29 +23,15 @@ public QueueHelper(AppSettings appSettings) public async Task> GetQueues(ServiceBusConnectionString connectionString) { - IList queues = new List(); var client = GetManagementClient(connectionString); - var queuesInfo = await client.GetQueuesRuntimeInfoAsync(_appSettings.QueueListFetchCount); - await client.CloseAsync(); - - await Task.WhenAll(queuesInfo.Select(async queue => - { - var queueName = queue.Path; - - var newQueue = new ServiceBusQueue(queue) - { - Name = queueName - }; - - queues.Add(newQueue); - })); - + var queues = await GetQueues(client); + await client.CloseAsync(); return queues; } - + public async Task SendMessage(ServiceBusConnectionString connectionString, string queueName, string content) { - var message = new AzureMessage {Body = Encoding.UTF8.GetBytes(content)}; + var message = new AzureMessage { Body = Encoding.UTF8.GetBytes(content) }; await SendMessage(connectionString, queueName, message); } @@ -72,7 +59,7 @@ public async Task> GetDlqMessages(ServiceBusConnectionString conn return receivedMessages.Select(message => new Message(message, true)).ToList(); } - + public async Task DeadletterMessage(ServiceBusConnectionString connectionString, string queue, Message message) { var receiver = GetMessageReceiver(connectionString, queue, ReceiveMode.PeekLock); @@ -95,7 +82,7 @@ public async Task DeadletterMessage(ServiceBusConnectionString connectionString, await receiver.CloseAsync(); } - + public async Task DeleteMessage(ServiceBusConnectionString connectionString, string queue, Message message, bool isDlq) { @@ -121,18 +108,19 @@ public async Task DeleteMessage(ServiceBusConnectionString connectionString, str await receiver.CloseAsync(); } - - private async Task PeekDlqMessageBySequenceNumber(ServiceBusConnectionString connectionString, string queue, long sequenceNumber) + + private async Task PeekDlqMessageBySequenceNumber(ServiceBusConnectionString connectionString, + string queue, long sequenceNumber) { var deadletterPath = EntityNameHelper.FormatDeadLetterPath(queue); var receiver = GetMessageReceiver(connectionString, deadletterPath, ReceiveMode.PeekLock); var azureMessage = await receiver.PeekBySequenceNumberAsync(sequenceNumber); await receiver.CloseAsync(); - + return azureMessage; } - + public async Task ResubmitDlqMessage(ServiceBusConnectionString connectionString, string queue, Message message) { var azureMessage = await PeekDlqMessageBySequenceNumber(connectionString, queue, message.SequenceNumber); @@ -165,7 +153,7 @@ public async Task PurgeMessages(ServiceBusConnectionString connectionStrin await receiver.CloseAsync(); return purgedCount; } - + public async Task TransferDlqMessages(ServiceBusConnectionString connectionString, string queuePath) { var path = EntityNameHelper.FormatDeadLetterPath(queuePath); @@ -193,7 +181,7 @@ public async Task TransferDlqMessages(ServiceBusConnectionString connectio } finally { - if (receiver != null) + if (receiver != null) await receiver.CloseAsync(); if (sender != null) @@ -202,4 +190,45 @@ public async Task TransferDlqMessages(ServiceBusConnectionString connectio return transferredCount; } + + private async Task> GetQueues(ManagementClient client) + { + var queueInfos = new List(); + var numberOfPages = _appSettings.QueueListFetchCount / MaxRequestItemsPerPage; + var remainder = _appSettings.QueueListFetchCount % (numberOfPages * MaxRequestItemsPerPage); + + for (int pageCount = 0; pageCount < numberOfPages; pageCount++) + { + var numberToSkip = MaxRequestItemsPerPage * pageCount; + var page = await client.GetQueuesRuntimeInfoAsync(MaxRequestItemsPerPage, numberToSkip); + if (page.Any()) + { + queueInfos.AddRange(page); + } + else + { + return queueInfos + .Select(q => new ServiceBusQueue(q) + { + Name = q.Path + }).ToList(); + } + } + + if (remainder > 0) + { + var numberAlreadyFetched = numberOfPages > 0 + ? MaxRequestItemsPerPage * numberOfPages + : 0; + var remainingItems = await client.GetQueuesRuntimeInfoAsync( + remainder, + numberAlreadyFetched); + queueInfos.AddRange(remainingItems); + } + + return queueInfos.Select(q => new ServiceBusQueue(q) + { + Name = q.Path + }).ToList(); + } } \ No newline at end of file diff --git a/PurpleExplorer/Helpers/TopicHelper.cs b/PurpleExplorer/Helpers/TopicHelper.cs index f8e1212..9774ee4 100644 --- a/PurpleExplorer/Helpers/TopicHelper.cs +++ b/PurpleExplorer/Helpers/TopicHelper.cs @@ -7,6 +7,7 @@ using System.Linq; using System.Text; using System.Threading.Tasks; +using AvaloniaEdit.Utils; using Message = PurpleExplorer.Models.Message; using AzureMessage = Microsoft.Azure.ServiceBus.Message; @@ -23,41 +24,76 @@ public TopicHelper(AppSettings appSettings) public async Task> GetTopicsAndSubscriptions(ServiceBusConnectionString connectionString) { - IList topics = new List(); var client = GetManagementClient(connectionString); - var busTopics = await client.GetTopicsAsync(_appSettings.TopicListFetchCount); + var topics = await GetTopicsWithSubscriptions(client); await client.CloseAsync(); + return topics; + } + + private async Task CreateTopicWithSubscriptions(ManagementClient client, TopicDescription topicDescription) + { + var topic = new ServiceBusTopic(topicDescription); + var subscriptions = await GetSubscriptions(client, topicDescription.Path); + topic.AddSubscriptions(subscriptions.ToArray()); + return topic; + } + + private async Task> GetTopicsWithSubscriptions(ManagementClient client) + { + var topicDescriptions = new List(); + var numberOfPages = _appSettings.TopicListFetchCount / MaxRequestItemsPerPage; + var remainder = _appSettings.TopicListFetchCount % (numberOfPages * MaxRequestItemsPerPage); - await Task.WhenAll(busTopics.Select(async topic => + for (int pageCount = 0; pageCount < numberOfPages; pageCount++) { - var newTopic = new ServiceBusTopic(topic); + var numberToSkip = MaxRequestItemsPerPage * pageCount; + var page = await client.GetTopicsAsync(MaxRequestItemsPerPage, numberToSkip); + if (page.Any()) + { + topicDescriptions.AddRange(page); + } + else + { + return (await Task.WhenAll(topicDescriptions + .Select(async topic => await CreateTopicWithSubscriptions(client, topic)))).ToList(); + } + } - var subscriptions = await GetSubscriptions(connectionString, newTopic.Name); - newTopic.AddSubscriptions(subscriptions.ToArray()); - topics.Add(newTopic); - })); + if (remainder > 0) + { + var numberAlreadyFetched = numberOfPages > 0 + ? MaxRequestItemsPerPage * numberOfPages + : 0; + var remainingItems = await client.GetTopicsAsync( + remainder, + numberAlreadyFetched); + topicDescriptions.AddRange(remainingItems); + } - return topics; + var topics = await Task.WhenAll(topicDescriptions + .Select(async topic => await CreateTopicWithSubscriptions(client, topic))); + return topics.ToList(); } - public async Task GetTopic(ServiceBusConnectionString connectionString, string topicPath, bool retrieveSubscriptions) + public async Task GetTopic(ServiceBusConnectionString connectionString, string topicPath, + bool retrieveSubscriptions) { var client = GetManagementClient(connectionString); var busTopics = await client.GetTopicAsync(topicPath); - await client.CloseAsync(); - var newTopic = new ServiceBusTopic(busTopics); if (retrieveSubscriptions) { - var subscriptions = await GetSubscriptions(connectionString, newTopic.Name); + var subscriptions = await GetSubscriptions(client, newTopic.Name); newTopic.AddSubscriptions(subscriptions.ToArray()); } - + + await client.CloseAsync(); return newTopic; } - - public async Task GetSubscription(ServiceBusConnectionString connectionString, string topicPath, string subscriptionName) + + public async Task GetSubscription(ServiceBusConnectionString connectionString, + string topicPath, string subscriptionName) { var client = GetManagementClient(connectionString); var runtimeInfo = await client.GetSubscriptionRuntimeInfoAsync(topicPath, subscriptionName); @@ -66,12 +102,12 @@ public async Task GetSubscription(ServiceBusConnectionSt return new ServiceBusSubscription(runtimeInfo); } - public async Task> GetSubscriptions(ServiceBusConnectionString connectionString, string topicPath) + private async Task> GetSubscriptions( + ManagementClient client, + string topicPath) { IList subscriptions = new List(); - var client = GetManagementClient(connectionString); var topicSubscription = await client.GetSubscriptionsRuntimeInfoAsync(topicPath); - await client.CloseAsync(); foreach (var sub in topicSubscription) { @@ -81,7 +117,8 @@ public async Task> GetSubscriptions(ServiceBusConn return subscriptions; } - public async Task> GetMessagesBySubscription(ServiceBusConnectionString connectionString, string topicName, + public async Task> GetMessagesBySubscription(ServiceBusConnectionString connectionString, + string topicName, string subscriptionName) { var path = EntityNameHelper.FormatSubscriptionPath(topicName, subscriptionName); @@ -94,7 +131,8 @@ public async Task> GetMessagesBySubscription(ServiceBusConnection return result; } - public async Task> GetDlqMessages(ServiceBusConnectionString connectionString, string topic, string subscription) + public async Task> GetDlqMessages(ServiceBusConnectionString connectionString, string topic, + string subscription) { var path = EntityNameHelper.FormatSubscriptionPath(topic, subscription); var deadletterPath = EntityNameHelper.FormatDeadLetterPath(path); @@ -115,7 +153,7 @@ public async Task GetNamespaceInfo(ServiceBusConnectionString con public async Task SendMessage(ServiceBusConnectionString connectionString, string topicPath, string content) { - var message = new AzureMessage {Body = Encoding.UTF8.GetBytes(content)}; + var message = new AzureMessage { Body = Encoding.UTF8.GetBytes(content) }; await SendMessage(connectionString, topicPath, message); } @@ -126,7 +164,8 @@ public async Task SendMessage(ServiceBusConnectionString connectionString, strin await topicClient.CloseAsync(); } - public async Task DeleteMessage(ServiceBusConnectionString connectionString, string topicPath, string subscriptionPath, + public async Task DeleteMessage(ServiceBusConnectionString connectionString, string topicPath, + string subscriptionPath, Message message, bool isDlq) { var path = EntityNameHelper.FormatSubscriptionPath(topicPath, subscriptionPath); @@ -153,7 +192,8 @@ public async Task DeleteMessage(ServiceBusConnectionString connectionString, str await receiver.CloseAsync(); } - public async Task PurgeMessages(ServiceBusConnectionString connectionString, string topicPath, string subscriptionPath, + public async Task PurgeMessages(ServiceBusConnectionString connectionString, string topicPath, + string subscriptionPath, bool isDlq) { var path = EntityNameHelper.FormatSubscriptionPath(topicPath, subscriptionPath); @@ -177,11 +217,12 @@ public async Task PurgeMessages(ServiceBusConnectionString connectionStrin return purgedCount; } - public async Task TransferDlqMessages(ServiceBusConnectionString connectionString, string topicPath, string subscriptionPath) + public async Task TransferDlqMessages(ServiceBusConnectionString connectionString, string topicPath, + string subscriptionPath) { var path = EntityNameHelper.FormatSubscriptionPath(topicPath, subscriptionPath); path = EntityNameHelper.FormatDeadLetterPath(path); - + long transferredCount = 0; MessageReceiver receiver = null; TopicClient sender = null; @@ -205,7 +246,7 @@ public async Task TransferDlqMessages(ServiceBusConnectionString connectio } finally { - if (receiver != null) + if (receiver != null) await receiver.CloseAsync(); if (sender != null) @@ -215,7 +256,8 @@ public async Task TransferDlqMessages(ServiceBusConnectionString connectio return transferredCount; } - private async Task PeekDlqMessageBySequenceNumber(ServiceBusConnectionString connectionString, string topicPath, + private async Task PeekDlqMessageBySequenceNumber(ServiceBusConnectionString connectionString, + string topicPath, string subscriptionPath, long sequenceNumber) { var path = EntityNameHelper.FormatSubscriptionPath(topicPath, subscriptionPath); @@ -224,11 +266,12 @@ private async Task PeekDlqMessageBySequenceNumber(ServiceBusConnec var receiver = GetMessageReceiver(connectionString, deadletterPath, ReceiveMode.PeekLock); var azureMessage = await receiver.PeekBySequenceNumberAsync(sequenceNumber); await receiver.CloseAsync(); - + return azureMessage; } - public async Task ResubmitDlqMessage(ServiceBusConnectionString connectionString, string topicPath, string subscriptionPath, + public async Task ResubmitDlqMessage(ServiceBusConnectionString connectionString, string topicPath, + string subscriptionPath, Message message) { var azureMessage = await PeekDlqMessageBySequenceNumber(connectionString, topicPath, subscriptionPath, @@ -240,7 +283,8 @@ public async Task ResubmitDlqMessage(ServiceBusConnectionString connectionString await DeleteMessage(connectionString, topicPath, subscriptionPath, message, true); } - public async Task DeadletterMessage(ServiceBusConnectionString connectionString, string topicPath, string subscriptionPath, + public async Task DeadletterMessage(ServiceBusConnectionString connectionString, string topicPath, + string subscriptionPath, Message message) { var path = EntityNameHelper.FormatSubscriptionPath(topicPath, subscriptionPath);