Skip to content

Commit

Permalink
Refactor: Reuse consumer base logic
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
  • Loading branch information
zarusz committed Jun 1, 2023
1 parent ad24d90 commit 86966dd
Show file tree
Hide file tree
Showing 27 changed files with 299 additions and 413 deletions.
94 changes: 36 additions & 58 deletions src/SlimMessageBus.Host.AzureEventHub/Consumer/EhGroupConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,48 +3,72 @@ namespace SlimMessageBus.Host.AzureEventHub;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Processor;

using SlimMessageBus.Host.Collections;

public class EhGroupConsumer : IAsyncDisposable, IConsumerControl
public class EhGroupConsumer : AbstractConsumer
{
private readonly ILogger _logger;
private readonly EventProcessorClient _processorClient;
private readonly SafeDictionaryWrapper<string, EhPartitionConsumer> _partitionConsumerByPartitionId;
private readonly GroupPath _groupPath;

public EventHubMessageBus MessageBus { get; }
public bool IsStarted { get; private set; }

public EhGroupConsumer(EventHubMessageBus messageBus, GroupPath groupPath, Func<GroupPathPartitionId, EhPartitionConsumer> partitionConsumerFactory)
public EhGroupConsumer(EventHubMessageBus messageBus, GroupPath groupPath, Func<GroupPathPartitionId, EhPartitionConsumer> partitionConsumerFactory)
: base(messageBus.LoggerFactory.CreateLogger<EhGroupConsumer>())
{
_groupPath = groupPath ?? throw new ArgumentNullException(nameof(groupPath));
_ = partitionConsumerFactory ?? throw new ArgumentNullException(nameof(partitionConsumerFactory));
if (partitionConsumerFactory == null) throw new ArgumentNullException(nameof(partitionConsumerFactory));

MessageBus = messageBus ?? throw new ArgumentNullException(nameof(messageBus));
_logger = messageBus.LoggerFactory.CreateLogger<EhGroupConsumer>();

_partitionConsumerByPartitionId = new SafeDictionaryWrapper<string, EhPartitionConsumer>(partitionId =>
{
_logger.LogDebug("Creating PartitionConsumer for Group: {Group}, Path: {Path}, PartitionId: {PartitionId}", groupPath.Group, groupPath.Path, partitionId);
Logger.LogDebug("Creating PartitionConsumer for Group: {Group}, Path: {Path}, PartitionId: {PartitionId}", groupPath.Group, groupPath.Path, partitionId);
try
{
return partitionConsumerFactory(new GroupPathPartitionId(groupPath, partitionId));
}
catch (Exception e)
{
_logger.LogError(e, "Error creating PartitionConsumer for Group: {Group}, Path: {Path}, PartitionId: {PartitionId}", groupPath.Group, groupPath.Path, partitionId);
Logger.LogError(e, "Error creating PartitionConsumer for Group: {Group}, Path: {Path}, PartitionId: {PartitionId}", groupPath.Group, groupPath.Path, partitionId);
throw;
}
});

_logger.LogInformation("Creating EventProcessorClient for EventHub with Group: {Group}, Path: {Path}", groupPath.Group, groupPath.Path);
Logger.LogInformation("Creating EventProcessorClient for EventHub with Group: {Group}, Path: {Path}", groupPath.Group, groupPath.Path);
_processorClient = MessageBus.ProviderSettings.EventHubProcessorClientFactory(new ConsumerParams(groupPath.Path, groupPath.Group, messageBus.BlobContainerClient));
_processorClient.PartitionInitializingAsync += PartitionInitializingAsync;
_processorClient.PartitionClosingAsync += PartitionClosingAsync;
_processorClient.ProcessEventAsync += ProcessEventHandler;
_processorClient.ProcessErrorAsync += ProcessErrorHandler;
}

protected override async Task OnStart()
{
if (!_processorClient.IsRunning)
{
Logger.LogInformation("Starting consumer Group: {Group}, Path: {Path}...", _groupPath.Group, _groupPath.Path);
await _processorClient.StartProcessingAsync().ConfigureAwait(false);
}
}

protected override async Task OnStop()
{
if (_processorClient.IsRunning)
{
Logger.LogInformation("Stopping consumer Group: {Group}, Path: {Path}...", _groupPath.Group, _groupPath.Path);

// stop the processing host
await _processorClient.StopProcessingAsync().ConfigureAwait(false);

var partitionConsumers = _partitionConsumerByPartitionId.ClearAndSnapshot();

if (MessageBus.ProviderSettings.EnableCheckpointOnBusStop)
{
// checkpoint anything we've processed thus far
await Task.WhenAll(partitionConsumers.Select(pc => pc.TryCheckpoint()));
}
}
}

private Task PartitionInitializingAsync(PartitionInitializingEventArgs args)
{
var partitionId = args.PartitionId;
Expand Down Expand Up @@ -76,54 +100,8 @@ private Task ProcessErrorHandler(ProcessErrorEventArgs args)
}
else
{
_logger.LogError(args.Exception, "Group error - Group: {Group}, Path: {Path}, Operation: {Operation}", _groupPath.Group, _groupPath.Path, args.Operation);
Logger.LogError(args.Exception, "Group error - Group: {Group}, Path: {Path}, Operation: {Operation}", _groupPath.Group, _groupPath.Path, args.Operation);
}
return Task.CompletedTask;
}

public async Task Start()
{
if (!_processorClient.IsRunning)
{
_logger.LogInformation("Starting consumer Group: {Group}, Path: {Path}...", _groupPath.Group, _groupPath.Path);
await _processorClient.StartProcessingAsync().ConfigureAwait(false);
IsStarted = true;
}
}

public async Task Stop()
{
if (_processorClient.IsRunning)
{
_logger.LogInformation("Stopping consumer Group: {Group}, Path: {Path}...", _groupPath.Group, _groupPath.Path);

// stop the processing host
await _processorClient.StopProcessingAsync().ConfigureAwait(false);

var partitionConsumers = _partitionConsumerByPartitionId.ClearAndSnapshot();

if (MessageBus.ProviderSettings.EnableCheckpointOnBusStop)
{
// checkpoint anything we've processed thus far
await Task.WhenAll(partitionConsumers.Select(pc => pc.TryCheckpoint()));
}

IsStarted = false;
}
}

#region IAsyncDisposable

public async ValueTask DisposeAsync()
{
await DisposeAsyncCore().ConfigureAwait(false);
GC.SuppressFinalize(this);
}

protected virtual async ValueTask DisposeAsyncCore()
{
await Stop().ConfigureAwait(false);
}

#endregion
}
34 changes: 9 additions & 25 deletions src/SlimMessageBus.Host.AzureEventHub/EventHubMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ public class EventHubMessageBus : MessageBusBase<EventHubMessageBusSettings>
private readonly ILogger _logger;
private BlobContainerClient _blobContainerClient;
private SafeDictionaryWrapper<string, EventHubProducerClient> _producerByPath;
private List<EhGroupConsumer> _groupConsumers;

protected internal BlobContainerClient BlobContainerClient => _blobContainerClient;

Expand Down Expand Up @@ -48,38 +47,31 @@ protected override void Build()
throw;
}
});

_groupConsumers = new List<EhGroupConsumer>();

_logger.LogInformation("Creating consumers");
}

protected override async Task CreateConsumers()
{
await base.CreateConsumers();

foreach (var (groupPath, consumerSettings) in Settings.Consumers.GroupBy(x => new GroupPath(path: x.Path, group: x.GetGroup())).ToDictionary(x => x.Key, x => x.ToList()))
{
_logger.LogInformation("Creating consumer for Path: {Path}, Group: {Group}", groupPath.Path, groupPath.Group);
_groupConsumers.Add(new EhGroupConsumer(this, groupPath, groupPathPartition => new EhPartitionConsumerForConsumers(this, consumerSettings, groupPathPartition)));
AddConsumer(new EhGroupConsumer(this, groupPath, groupPathPartition => new EhPartitionConsumerForConsumers(this, consumerSettings, groupPathPartition)));
}

if (Settings.RequestResponse != null)
{
var pathGroup = new GroupPath(Settings.RequestResponse.Path, Settings.RequestResponse.GetGroup());
_logger.LogInformation("Creating response consumer for Path: {Path}, Group: {Group}", pathGroup.Path, pathGroup.Group);
_groupConsumers.Add(new EhGroupConsumer(this, pathGroup, groupPathPartition => new EhPartitionConsumerForResponses(this, Settings.RequestResponse, groupPathPartition)));
AddConsumer(new EhGroupConsumer(this, pathGroup, groupPathPartition => new EhPartitionConsumerForResponses(this, Settings.RequestResponse, groupPathPartition)));
}

}

protected override async ValueTask DisposeAsyncCore()
{
await base.DisposeAsyncCore();

if (_groupConsumers != null)
{
foreach (var groupConsumer in _groupConsumers)
{
await groupConsumer.DisposeSilently("Consumer", _logger);
}
_groupConsumers.Clear();
_groupConsumers = null;
}

if (_producerByPath != null)
{
var producers = _producerByPath.ClearAndSnapshot();
Expand Down Expand Up @@ -108,15 +100,7 @@ protected override async Task OnStart()
_logger.LogWarning(e, "Attempt to create blob container {BlobContainer} failed - the blob container is needed to store the consumer group offsets", _blobContainerClient.Name);
}
}

await Task.WhenAll(_groupConsumers.Select(x => x.Start()));
}

protected override async Task OnStop()
{
await base.OnStop();
await Task.WhenAll(_groupConsumers.Select(x => x.Stop()));
}

protected override async Task ProduceToTransport(object message, string path, byte[] messagePayload, IDictionary<string, object> messageHeaders, CancellationToken cancellationToken)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace SlimMessageBus.Host.AzureServiceBus.Consumer;

public abstract class AsbBaseConsumer : AbstractConsumer, IConsumerControl
public abstract class AsbBaseConsumer : AbstractConsumer
{
private ServiceBusProcessor _serviceBusProcessor;
private ServiceBusSessionProcessor _serviceBusSessionProcessor;
Expand Down Expand Up @@ -112,6 +112,7 @@ protected override async Task OnStart()
protected override async Task OnStop()
{
Logger.LogInformation("Stopping consumer for Path: {Path}, SubscriptionName: {SubscriptionName}", TopicSubscription.Path, TopicSubscription.SubscriptionName);

if (_serviceBusProcessor != null)
{
await _serviceBusProcessor.StopProcessingAsync().ConfigureAwait(false);
Expand Down
5 changes: 0 additions & 5 deletions src/SlimMessageBus.Host.AzureServiceBus/GlobalSuppressions.cs

This file was deleted.

Loading

0 comments on commit 86966dd

Please sign in to comment.