Skip to content

Commit

Permalink
EventHubConsumer fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
paolos authored and paolos committed Mar 7, 2017
1 parent c95df49 commit 7dd0063
Show file tree
Hide file tree
Showing 12 changed files with 722 additions and 689 deletions.
2 changes: 1 addition & 1 deletion Controls/ListenerControl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1229,7 +1229,7 @@ private async void AsyncTrackMessage()
{
try
{
while (!logStopped)
while (!cancellationTokenSource.IsCancellationRequested && !logStopped)
{
try
{
Expand Down
47 changes: 23 additions & 24 deletions Controls/PartitionListenerControl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
using System.Linq;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
// ReSharper disable CoVariantArrayConversion
#endregion

namespace Microsoft.WindowsAzure.CAT.ServiceBusExplorer
Expand Down Expand Up @@ -149,7 +150,9 @@ public partial class PartitionListenerControl : UserControl
private Dictionary<string, bool> registeredDictionary = new Dictionary<string, bool>();
private bool clearing;
private bool cleared;
private string iotHubConnectionString;
private readonly string iotHubConnectionString;
public Task AsyncTrackEventDataTask { get; private set; }

#endregion

#region Public Constructors
Expand All @@ -165,13 +168,6 @@ public PartitionListenerControl(WriteToLogDelegate writeToLog,
ConsumerGroupDescription consumerGroupDescription,
IEnumerable<PartitionDescription> partitionDescriptions)
{
Task.Factory.StartNew(AsyncTrackEventData).ContinueWith(t =>
{
if (t.IsFaulted && t.Exception != null)
{
writeToLog(t.Exception.Message);
}
});
this.writeToLog = writeToLog;
this.stopLog = stopLog;
this.startLog = startLog;
Expand All @@ -185,9 +181,11 @@ public PartitionListenerControl(WriteToLogDelegate writeToLog,
? eventHubClient.GetDefaultConsumerGroup()
: eventHubClient.GetConsumerGroup(consumerGroupDescription.Name);
IList<string> partitionIdList = partitionDescriptions.Select(pd => pd.PartitionId).ToList();
foreach (var id in partitionIdList)
var taskList = partitionIdList.Select(id => eventHubClient.GetPartitionRuntimeInformationAsync(id)).ToList();
Task.WaitAll(taskList.ToArray());
foreach (var task in taskList)
{
partitionRuntumeInformationList.Add(eventHubClient.GetPartitionRuntimeInformation(id));
partitionRuntumeInformationList.Add(task.Result);
}
partitionCount = partitionRuntumeInformationList.Count;
InitializeComponent();
Expand All @@ -202,13 +200,6 @@ public PartitionListenerControl(WriteToLogDelegate writeToLog,
string hubName,
string consumerGroupName)
{
Task.Factory.StartNew(AsyncTrackEventData).ContinueWith(t =>
{
if (t.IsFaulted && t.Exception != null)
{
writeToLog(t.Exception.Message);
}
});
this.iotHubConnectionString = iotHubConnectionString;
this.writeToLog = writeToLog;
this.stopLog = stopLog;
Expand All @@ -221,9 +212,11 @@ public PartitionListenerControl(WriteToLogDelegate writeToLog,
? eventHubClient.GetDefaultConsumerGroup()
: eventHubClient.GetConsumerGroup(consumerGroupName);
IList<string> partitionIdList = new List<string>(eventHubClient.GetRuntimeInformation().PartitionIds);
foreach (var id in partitionIdList)
var taskList = partitionIdList.Select(id => eventHubClient.GetPartitionRuntimeInformationAsync(id)).ToList();
Task.WaitAll(taskList.ToArray());
foreach (var task in taskList)
{
partitionRuntumeInformationList.Add(eventHubClient.GetPartitionRuntimeInformation(id));
partitionRuntumeInformationList.Add(task.Result);
}
partitionCount = partitionRuntumeInformationList.Count;
InitializeComponent();
Expand Down Expand Up @@ -782,6 +775,15 @@ private async void btnStart_Click(object sender, EventArgs e)
: null;

cancellationTokenSource = new CancellationTokenSource();

AsyncTrackEventDataTask = Task.Factory.StartNew(AsyncTrackEventData, cancellationTokenSource.Token).ContinueWith(t =>
{
if (t.IsFaulted && t.Exception != null)
{
writeToLog(t.Exception.Message);
}
});

btnStart.Text = Stop;
blockingCollection = new BlockingCollection<Tuple<long, long, long>>();
timer = new System.Timers.Timer
Expand Down Expand Up @@ -899,7 +901,7 @@ private async void AsyncTrackEventData()
{
try
{
while (true)
while (!cancellationTokenSource.IsCancellationRequested)
{
try
{
Expand Down Expand Up @@ -1103,10 +1105,7 @@ private void timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)

private async Task StopListenerAsync()
{
if (cancellationTokenSource != null)
{
cancellationTokenSource.Cancel();
}
cancellationTokenSource?.Cancel();
lock (this)
{
stopping = true;
Expand Down
4 changes: 2 additions & 2 deletions Forms/AboutForm.Designer.cs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 18 additions & 18 deletions Forms/MainForm.Designer.cs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions Forms/MainForm.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1313,6 +1313,19 @@ private void connectToolStripMenuItem_Click(object sender, EventArgs e)
connectForm.TransportType);
}
}
// Set Relay Host Name
//var assembly = Assembly.GetAssembly(typeof(ServiceBus.ServiceBusEnvironment));
//var type = assembly.GetType("Microsoft.ServiceBus.RelayEnvironment");
//if (type != null)
//{
// var property = type.GetProperty("RelayHostRootName",
// BindingFlags.Static |
// BindingFlags.Public);
// if (property != null && serviceBusHelper.NamespaceUri != null)
// {
// property.SetValue(null, serviceBusHelper.GetHostWithoutNamespace());
// }
//}
foreach (var userControl in panelMain.Controls.OfType<UserControl>())
{
userControl.Dispose();
Expand Down
Loading

0 comments on commit 7dd0063

Please sign in to comment.