Skip to content

Commit

Permalink
Fixed issue #41
Browse files Browse the repository at this point in the history
  • Loading branch information
armannaj committed Feb 18, 2022
1 parent 95d0fbc commit 0b5a236
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 30 deletions.
6 changes: 3 additions & 3 deletions PurpleExplorer/Helpers/ITopicHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ namespace PurpleExplorer.Helpers
public interface ITopicHelper
{
public Task<NamespaceInfo> GetNamespaceInfo(string connectionString);
public Task<IList<ServiceBusTopic>> GetTopics(string connectionString);
public Task<IList<ServiceBusTopic>> GetTopicsAndSubscriptions(string connectionString);
public Task<ServiceBusTopic> GetTopic(string connectionString, string topicPath, bool retrieveSubscriptions);
public Task<IList<ServiceBusSubscription>> GetSubscriptions(string connectionString, string topicPath);
public Task<ServiceBusSubscription> GetSubscription(string connectionString, string topicPath, string subscriptionName);
public Task<IList<Message>> GetDlqMessages(string connectionString, string topic, string subscription);
public Task<IList<Models.Message>> GetMessagesBySubscription(string connectionString, string topicName, string subscriptionName);
public Task SendMessage(string connectionString, string topicPath, string content);
public Task SendMessage(string connectionString, string topicPath, AzureMessage message);
public Task DeleteMessage(string connectionString, string topicPath, string subscriptionPath, Message message, bool isDlq);
public Task<SubscriptionRuntimeInfo> GetSubscriptionRuntimeInfo(string connectionString, string topicPath,
string subscriptionName);
public Task<long> PurgeMessages(string connectionString, string topicPath, string subscriptionPath, bool isDlq);
public Task<long> TransferDlqMessages(string connectionString, string topicPath, string subscriptionPath);
public Task ResubmitDlqMessage(string connectionString, string topicPath, string subscriptionPath,
Expand Down
37 changes: 24 additions & 13 deletions PurpleExplorer/Helpers/TopicHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,38 +16,49 @@ public class TopicHelper : ITopicHelper
{
private int _maxMessageCount = 100;

public async Task<IList<ServiceBusTopic>> GetTopics(string connectionString)
public async Task<IList<ServiceBusTopic>> GetTopicsAndSubscriptions(string connectionString)
{
IList<ServiceBusTopic> topics = new List<ServiceBusTopic>();
var client = new ManagementClient(connectionString);
var busTopics = await client.GetTopicsAsync();
await client.CloseAsync();

await Task.WhenAll(busTopics.Select(async t =>
await Task.WhenAll(busTopics.Select(async topic =>
{
var topicName = t.Path;
var subscriptions = await GetSubscriptions(connectionString, topicName);
var newTopic = new ServiceBusTopic
{
Name = topicName
};
var newTopic = new ServiceBusTopic(topic);
var subscriptions = await GetSubscriptions(connectionString, newTopic.Name);
newTopic.AddSubscriptions(subscriptions.ToArray());
topics.Add(newTopic);
}));

return topics;
}

public async Task<SubscriptionRuntimeInfo> GetSubscriptionRuntimeInfo(string connectionString,
string topicPath, string subscriptionName)
public async Task<ServiceBusTopic> GetTopic(string connectionString, string topicPath, bool retrieveSubscriptions)
{
ManagementClient client = new ManagementClient(connectionString);
var client = new ManagementClient(connectionString);
var busTopics = await client.GetTopicAsync(topicPath);
await client.CloseAsync();

var newTopic = new ServiceBusTopic(busTopics);

if (retrieveSubscriptions)
{
var subscriptions = await GetSubscriptions(connectionString, newTopic.Name);
newTopic.AddSubscriptions(subscriptions.ToArray());
}

return newTopic;
}

public async Task<ServiceBusSubscription> GetSubscription(string connectionString, string topicPath, string subscriptionName)
{
var client = new ManagementClient(connectionString);
var runtimeInfo = await client.GetSubscriptionRuntimeInfoAsync(topicPath, subscriptionName);
await client.CloseAsync();

return runtimeInfo;
return new ServiceBusSubscription(runtimeInfo);
}

public async Task<IList<ServiceBusSubscription>> GetSubscriptions(string connectionString, string topicPath)
Expand Down
26 changes: 20 additions & 6 deletions PurpleExplorer/Models/MessageCollection.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using DynamicData;
using ReactiveUI;

Expand All @@ -10,6 +11,7 @@ namespace PurpleExplorer.Models
/// </summary>
public abstract class MessageCollection : ReactiveObject
{
// These are needed to be set before fetching messages, in the second constructor
private long _messageCount;
private long _dlqCount;

Expand All @@ -19,13 +21,13 @@ public abstract class MessageCollection : ReactiveObject
public long MessageCount
{
get => _messageCount;
set => this.RaiseAndSetIfChanged(ref _messageCount, value);
private set => this.RaiseAndSetIfChanged(ref _messageCount, value);
}

public long DlqCount
{
get => _dlqCount;
set => this.RaiseAndSetIfChanged(ref _dlqCount, value);
private set => this.RaiseAndSetIfChanged(ref _dlqCount, value);
}

protected MessageCollection()
Expand All @@ -43,25 +45,37 @@ protected MessageCollection(long messageCount, long dlqCount) : this()
public void AddMessages(IEnumerable<Message> messages)
{
Messages.AddRange(messages);
_messageCount = Messages.Count;
MessageCount = Messages.Count;
}

public void RemoveMessage(string messageId)
{
Messages.Remove(Messages.Single(msg => msg.MessageId.Equals(messageId)));
MessageCount = Messages.Count;
}

public void ClearMessages()
{
Messages.Clear();
_messageCount = 0;
MessageCount = Messages.Count;
}

public void AddDlqMessages(IEnumerable<Message> dlqMessages)
{
DlqMessages.AddRange(dlqMessages);
_dlqCount = DlqMessages.Count;
DlqCount = DlqMessages.Count;
}

public void RemoveDlqMessage(string messageId)
{
DlqMessages.Remove(DlqMessages.Single(msg => msg.MessageId.Equals(messageId)));
DlqCount = DlqMessages.Count;
}

public void ClearDlqMessages()
{
DlqMessages.Clear();
_dlqCount = 0;
DlqCount = DlqMessages.Count;
}
}
}
8 changes: 8 additions & 0 deletions PurpleExplorer/Models/ServiceBusResource.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
using System;
using System.Collections.ObjectModel;

namespace PurpleExplorer.Models
{
public class ServiceBusResource
{
public string Name { get; set; }
public DateTime CreatedTime { get; set; }
public string ConnectionString { get; set; }
public ObservableCollection<ServiceBusQueue> Queues { get; private set; }
public ObservableCollection<ServiceBusTopic> Topics { get; private set; }
Expand All @@ -30,5 +32,11 @@ public void AddQueues(params ServiceBusQueue[] queues)
Queues.Add(queue);
}
}

public override bool Equals(object? obj)
{
var comparingResource = obj as ServiceBusResource;
return Name.Equals(comparingResource.Name) && CreatedTime.Equals(comparingResource.CreatedTime);
}
}
}
11 changes: 11 additions & 0 deletions PurpleExplorer/Models/ServiceBusTopic.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.ObjectModel;
using Microsoft.Azure.ServiceBus.Management;

namespace PurpleExplorer.Models
{
Expand All @@ -8,6 +9,16 @@ public class ServiceBusTopic
public ObservableCollection<ServiceBusSubscription> Subscriptions { get; private set; }
public ServiceBusResource ServiceBus { get; set; }

public ServiceBusTopic()
{
}

public ServiceBusTopic(TopicDescription topicDescription)
{
Name = topicDescription.Path;
}


public void AddSubscriptions(params ServiceBusSubscription[] subscriptions)
{
Subscriptions ??= new ObservableCollection<ServiceBusSubscription>();
Expand Down
44 changes: 36 additions & 8 deletions PurpleExplorer/ViewModels/MainWindowViewModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -183,18 +183,19 @@ await ModalWindowHelper.ShowModalWindow<ConnectionStringWindow, ConnectionString
LoggingService.Log("Connecting...");

var namespaceInfo = await _topicHelper.GetNamespaceInfo(ConnectionString);
var topics = await _topicHelper.GetTopics(ConnectionString);
var topics = await _topicHelper.GetTopicsAndSubscriptions(ConnectionString);
var queues = await _queueHelper.GetQueues(ConnectionString);

var newResource = new ServiceBusResource
var serviceBusResource = new ServiceBusResource
{
Name = namespaceInfo.Name,
ConnectionString = this.ConnectionString
CreatedTime = namespaceInfo.CreatedTime,
ConnectionString = ConnectionString
};

newResource.AddQueues(queues.ToArray());
newResource.AddTopics(topics.ToArray());
ConnectedServiceBuses.Add(newResource);
serviceBusResource.AddQueues(queues.ToArray());
serviceBusResource.AddTopics(topics.ToArray());
ConnectedServiceBuses.Add(serviceBusResource);
LoggingService.Log("Connected to Service Bus: " + namespaceInfo.Name);
}
catch (ArgumentException)
Expand Down Expand Up @@ -306,6 +307,20 @@ public void RefreshTabHeaders()
}
}

public async Task RefreshConnectedServiceBuses()
{
foreach (var serviceBusResource in ConnectedServiceBuses)
{
var topicsAndSubscriptions = await _topicHelper.GetTopicsAndSubscriptions(serviceBusResource.ConnectionString);
var serviceBusQueues = await _queueHelper.GetQueues(serviceBusResource.ConnectionString);

serviceBusResource.Topics.Clear();
serviceBusResource.Queues.Clear();
serviceBusResource.AddTopics(topicsAndSubscriptions.ToArray());
serviceBusResource.AddQueues(serviceBusQueues.ToArray());
}
}

public async void AddMessage()
{
var viewModal = new AddMessageWindowViewModal();
Expand Down Expand Up @@ -419,21 +434,34 @@ public async void PurgeMessages(string isDlqText)
var connectionString = CurrentSubscription.Topic.ServiceBus.ConnectionString;
purgedCount = await _topicHelper.PurgeMessages(connectionString, _currentTopic.Name,
_currentSubscription.Name, isDlq);

if (!isDlq)
CurrentSubscription.ClearMessages();
else
CurrentSubscription.ClearDlqMessages();
}

if (CurrentQueue != null)
{
var connectionString = CurrentQueue.ServiceBus.ConnectionString;
purgedCount = await _queueHelper.PurgeMessages(connectionString, _currentQueue.Name, isDlq);

if (!isDlq)
CurrentQueue.ClearMessages();
else
CurrentQueue.ClearDlqMessages();
}

LoggingService.Log($"Purged {purgedCount} messages in {purgingPath}");

// Refreshing messages
await FetchMessages();
}

public async Task Refresh()
{
await FetchMessages();
await RefreshConnectedServiceBuses();
RefreshTabHeaders();
await FetchMessages();
}

public async Task FetchMessages()
Expand Down
10 changes: 10 additions & 0 deletions PurpleExplorer/ViewModels/MessageDetailsWindowViewModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,22 @@ public async void DeleteMessage(Window window)
var connectionString = Subscription.Topic.ServiceBus.ConnectionString;
await _topicHelper.DeleteMessage(connectionString, Subscription.Topic.Name, Subscription.Name,
Message, Message.IsDlq);

if(!Message.IsDlq)
Subscription.RemoveMessage(Message.MessageId);
else
Subscription.RemoveDlqMessage(Message.MessageId);
}

if (Queue != null)
{
var connectionString = Queue.ServiceBus.ConnectionString;
await _queueHelper.DeleteMessage(connectionString, Queue.Name, Message, Message.IsDlq);

if(!Message.IsDlq)
Queue.RemoveMessage(Message.MessageId);
else
Queue.RemoveDlqMessage(Message.MessageId);
}

_loggingService.Log($"Message deleted, MessageId: {Message.MessageId}");
Expand Down

0 comments on commit 0b5a236

Please sign in to comment.