Skip to content

Commit

Permalink
Refactor: Improve configuration error reports
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
  • Loading branch information
zarusz committed May 28, 2023
1 parent 1081df8 commit 934f45c
Show file tree
Hide file tree
Showing 30 changed files with 580 additions and 288 deletions.
8 changes: 5 additions & 3 deletions docs/provider_azure_servicebus.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,11 @@ mbb.Consume<TMessage>(x => x
Where applicable, selected settings can have the default values applied using `ServiceBusMessageBusSettings`:

```cs
mbb.WithProviderServiceBus(new ServiceBusMessageBusSettings(connectionString) {
MaxAutoLockRenewalDuration = TimeSpan.FromMinutes(7),
PrefetchCount = 10,
mbb.WithProviderServiceBus(cfg =>
{
// ...
cfg.MaxAutoLockRenewalDuration = TimeSpan.FromMinutes(7);
cfg.PrefetchCount = 10;
})
```

Expand Down
32 changes: 6 additions & 26 deletions src/SlimMessageBus.Host.AzureEventHub/EventHubMessageBus.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
namespace SlimMessageBus.Host.AzureEventHub;
namespace SlimMessageBus.Host.AzureEventHub;

using SlimMessageBus.Host.Services;

/// <summary>
/// MessageBus implementation for Azure Event Hub.
Expand All @@ -15,34 +17,12 @@ public class EventHubMessageBus : MessageBusBase<EventHubMessageBusSettings>
public EventHubMessageBus(MessageBusSettings settings, EventHubMessageBusSettings providerSettings)
: base(settings, providerSettings)
{
_logger = LoggerFactory.CreateLogger<EventHubMessageBus>();
_logger = LoggerFactory.CreateLogger<EventHubMessageBus>();

OnBuildProvider();
}

protected override void AssertSettings()
{
base.AssertSettings();

if (string.IsNullOrEmpty(ProviderSettings.ConnectionString))
{
throw new ConfigurationMessageBusException(Settings, $"The {nameof(EventHubMessageBusSettings)}.{nameof(EventHubMessageBusSettings.ConnectionString)} must be set");
}

if (IsAnyConsumerDeclared)
{
if (string.IsNullOrEmpty(ProviderSettings.StorageConnectionString))
{
throw new ConfigurationMessageBusException(Settings, $"When consumers are declared, the {nameof(EventHubMessageBusSettings)}.{nameof(EventHubMessageBusSettings.StorageConnectionString)} must be set");
}
if (string.IsNullOrEmpty(ProviderSettings.StorageBlobContainerName))
{
throw new ConfigurationMessageBusException(Settings, $"When consumers are declared, the {nameof(EventHubMessageBusSettings)}.{nameof(EventHubMessageBusSettings.StorageBlobContainerName)} must be set");
}
}
}

private bool IsAnyConsumerDeclared => Settings.Consumers.Count > 0 || Settings.RequestResponse != null;
protected override IMessageBusSettingsValidationService ValidationService => new EventHubMessageBusSettingsValidationService(Settings, ProviderSettings);

#region Overrides of MessageBusBase

Expand All @@ -51,7 +31,7 @@ protected override void Build()
base.Build();

// Initialize storage client only when there are consumers declared
_blobContainerClient = IsAnyConsumerDeclared
_blobContainerClient = Settings.IsAnyConsumerDeclared()
? ProviderSettings.BlobContanerClientFactory()
: null;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
namespace SlimMessageBus.Host.AzureEventHub;

internal class EventHubMessageBusSettingsValidationService : DefaultMessageBusSettingsValidationService<EventHubMessageBusSettings>
{
public EventHubMessageBusSettingsValidationService(MessageBusSettings settings, EventHubMessageBusSettings providerSettings)
: base(settings, providerSettings)
{
}

public override void AssertSettings()
{
base.AssertSettings();

if (string.IsNullOrEmpty(ProviderSettings.ConnectionString))
{
throw new ConfigurationMessageBusException(Settings, $"The {nameof(EventHubMessageBusSettings)}.{nameof(EventHubMessageBusSettings.ConnectionString)} must be set");
}

if (Settings.IsAnyConsumerDeclared())
{
if (string.IsNullOrEmpty(ProviderSettings.StorageConnectionString))
{
throw new ConfigurationMessageBusException(Settings, $"When consumers are declared, the {nameof(EventHubMessageBusSettings)}.{nameof(EventHubMessageBusSettings.StorageConnectionString)} must be set");
}
if (string.IsNullOrEmpty(ProviderSettings.StorageBlobContainerName))
{
throw new ConfigurationMessageBusException(Settings, $"When consumers are declared, the {nameof(EventHubMessageBusSettings)}.{nameof(EventHubMessageBusSettings.StorageBlobContainerName)} must be set");
}
}
}
}
1 change: 1 addition & 0 deletions src/SlimMessageBus.Host.AzureEventHub/GlobalUsings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

global using SlimMessageBus.Host;
global using SlimMessageBus.Host.Collections;
global using SlimMessageBus.Host.Services;

global using Azure.Messaging.EventHubs;
global using Azure.Messaging.EventHubs.Producer;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace SlimMessageBus.Host.AzureEventHub;

internal static class MessageBusSettingsExtensions
{
internal static bool IsAnyConsumerDeclared(this MessageBusSettings settings) => settings.Consumers.Count > 0 || settings.RequestResponse != null;
}
2 changes: 2 additions & 0 deletions src/SlimMessageBus.Host.AzureServiceBus/GlobalUsings.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
global using Microsoft.Extensions.Logging;

global using SlimMessageBus.Host;
global using SlimMessageBus.Host.Collections;
global using SlimMessageBus.Host.Services;

global using Azure.Messaging.ServiceBus;
global using Azure.Messaging.ServiceBus.Administration;
25 changes: 1 addition & 24 deletions src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
namespace SlimMessageBus.Host.AzureServiceBus;

using SlimMessageBus.Host.AzureServiceBus.Consumer;
using SlimMessageBus.Host.Collections;

public class ServiceBusMessageBus : MessageBusBase<ServiceBusMessageBusSettings>
{
Expand All @@ -21,29 +20,7 @@ public ServiceBusMessageBus(MessageBusSettings settings, ServiceBusMessageBusSet
OnBuildProvider();
}

protected override void AssertSettings()
{
base.AssertSettings();

if (string.IsNullOrEmpty(ProviderSettings.ConnectionString))
{
throw new ConfigurationMessageBusException(Settings, $"The {nameof(ServiceBusMessageBusSettings)}.{nameof(ServiceBusMessageBusSettings.ConnectionString)} must be set");
}

var kindMapping = new KindMapping();
// This will validae if one path is mapped to both a topic and a queue
kindMapping.Configure(Settings);
}

protected override void AssertConsumerSettings(ConsumerSettings consumerSettings)
{
if (consumerSettings is null) throw new ArgumentNullException(nameof(consumerSettings));

base.AssertConsumerSettings(consumerSettings);

Assert.IsTrue(consumerSettings.PathKind != PathKind.Topic || consumerSettings.GetSubscriptionName(required: false) != null,
() => new ConfigurationMessageBusException($"The {nameof(ConsumerSettings)}.{nameof(AsbConsumerBuilderExtensions.SubscriptionName)} is not set on topic {consumerSettings.Path}"));
}
protected override IMessageBusSettingsValidationService ValidationService => new ServiceBusMessageBusSettingsValidationService(Settings, ProviderSettings);

protected void AddConsumer(TopicSubscriptionParams topicSubscription, IMessageProcessor<ServiceBusReceivedMessage> messageProcessor, IEnumerable<AbstractConsumerSettings> consumerSettings)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
namespace SlimMessageBus.Host.AzureServiceBus;

internal class ServiceBusMessageBusSettingsValidationService : DefaultMessageBusSettingsValidationService<ServiceBusMessageBusSettings>
{
public ServiceBusMessageBusSettingsValidationService(MessageBusSettings settings, ServiceBusMessageBusSettings providerSettings) : base(settings, providerSettings)
{
}

public override void AssertSettings()
{
base.AssertSettings();

if (string.IsNullOrEmpty(ProviderSettings.ConnectionString))
{
ThrowFieldNotSet(nameof(ProviderSettings.ConnectionString));
}

var kindMapping = new KindMapping();
// This will validae if one path is mapped to both a topic and a queue
kindMapping.Configure(Settings);
}

protected override void AssertConsumer(ConsumerSettings consumerSettings)
{
base.AssertConsumer(consumerSettings);

if (consumerSettings.PathKind == PathKind.Topic && string.IsNullOrEmpty(consumerSettings.GetSubscriptionName(required: false)))
{
ThrowConsumerFieldNotSet(consumerSettings, nameof(AsbConsumerBuilderExtensions.SubscriptionName));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class MessageBusBuilder
public IList<Action<IServiceCollection>> PostConfigurationActions { get; } = new List<Action<IServiceCollection>>();

protected MessageBusBuilder()
{
{
}

protected MessageBusBuilder(MessageBusBuilder other)
Expand Down Expand Up @@ -266,9 +266,22 @@ public MessageBusBuilder WithMessageTypeResolver(Type messageTypeResolverType)
/// <summary>
/// Hook called whenver message is being produced. Can be used to add (or mutate) message headers.
/// </summary>
public MessageBusBuilder WithHeaderModifier(Action<IDictionary<string, object>, object> headerModifierAction)
public MessageBusBuilder WithHeaderModifier(MessageHeaderModifier<object> headerModifier) => WithHeaderModifier<object>(headerModifier);

/// <summary>
/// Hook called whenver message is being produced. Can be used to add (or mutate) message headers.
/// </summary>
public MessageBusBuilder WithHeaderModifier<T>(MessageHeaderModifier<T> headerModifier)
{
Settings.HeaderModifier = headerModifierAction ?? throw new ArgumentNullException(nameof(headerModifierAction));
if (headerModifier == null) throw new ArgumentNullException(nameof(headerModifier));

Settings.HeaderModifier = (headers, message) =>
{
if (message is T typedMessage)
{
headerModifier(headers, typedMessage);
}
};
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,11 @@ public ProducerBuilder<T> DefaultTimeout(TimeSpan timeout)
/// <summary>
/// Hook called whenver message is being produced. Can be used to add (or mutate) message headers.
/// </summary>
public ProducerBuilder<T> WithHeaderModifier(Action<IDictionary<string, object>, T> headerModifierAction)
public ProducerBuilder<T> WithHeaderModifier(MessageHeaderModifier<T> headerModifier)
{
if (headerModifierAction == null) throw new ArgumentNullException(nameof(headerModifierAction));
if (headerModifier == null) throw new ArgumentNullException(nameof(headerModifier));

// ToDo: Introduce delegate (HeaderModifier)
Settings.HeaderModifier = (headers, message) => headerModifierAction(headers, (T)message);
Settings.HeaderModifier = (headers, message) => headerModifier(headers, (T)message);
return this;
}
}
3 changes: 3 additions & 0 deletions src/SlimMessageBus.Host.Configuration/Settings/Delegates.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
namespace SlimMessageBus.Host;

public delegate void MessageHeaderModifier<in T>(IDictionary<string, object> headers, T message);
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public IServiceProvider ServiceProvider
/// <summary>
/// Hook called whenver message is being produced. Can be used to add (or mutate) message headers.
/// </summary>
public Action<IDictionary<string, object>, object> HeaderModifier { get; set; }
// ToDo: Support many modifiers
public MessageHeaderModifier<object> HeaderModifier { get; set; }

/// <summary>
/// When true will start the message consumption on consumers after the bus is created.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ public class ProducerSettings : HasProviderExtensions
/// <summary>
/// Hook called whenver message is being produced. Can be used to add (or mutate) message headers.
/// </summary>
public Action<IDictionary<string, object>, object> HeaderModifier { get; set; }
}
public MessageHeaderModifier<object> HeaderModifier { get; set; }
}
37 changes: 8 additions & 29 deletions src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ namespace SlimMessageBus.Host.Kafka;

using System.Diagnostics.CodeAnalysis;

using SlimMessageBus.Host.Services;

using static Confluent.Kafka.ConfigPropertyNames;

using IProducer = Confluent.Kafka.IProducer<byte[], byte[]>;
using Message = Confluent.Kafka.Message<byte[], byte[]>;

Expand All @@ -26,6 +30,8 @@ public KafkaMessageBus(MessageBusSettings settings, KafkaMessageBusSettings prov
public IMessageSerializer HeaderSerializer
=> ProviderSettings.HeaderSerializer ?? Serializer;

protected override IMessageBusSettingsValidationService ValidationService => new KafkaMessageBusSettingsValidationService(Settings, ProviderSettings);

protected override void Build()
{
base.Build();
Expand Down Expand Up @@ -110,41 +116,14 @@ protected async override Task OnStart()
_logger.LogInformation("Group consumers starting...");
foreach (var groupConsumer in _groupConsumers)
{
groupConsumer.Start();
await groupConsumer.Start();
}
_logger.LogInformation("Group consumers started");
}

private void AddGroupConsumer(string group, IReadOnlyCollection<string> topics, Func<TopicPartition, IKafkaCommitController, IKafkaPartitionConsumer> processorFactory)
=> _groupConsumers.Add(new KafkaGroupConsumer(this, group, topics, processorFactory));

protected override void AssertSettings()
{
base.AssertSettings();

if (string.IsNullOrEmpty(ProviderSettings.BrokerList))
{
throw new ConfigurationMessageBusException(Settings, $"The {nameof(KafkaMessageBusSettings)}.{nameof(KafkaMessageBusSettings.BrokerList)} must be set");
}

foreach (var consumer in Settings.Consumers)
{
Assert.IsTrue(consumer.GetGroup() != null,
() => new ConfigurationMessageBusException($"Consumer ({consumer.MessageType}): group was not provided"));
}

if (Settings.RequestResponse != null)
{
Assert.IsTrue(Settings.RequestResponse.GetGroup() != null,
() => new ConfigurationMessageBusException("Request-response: group was not provided"));

if (Settings.Consumers.Any(x => x.GetGroup() == Settings.RequestResponse.GetGroup() && x.Path == Settings.RequestResponse.Path))
{
throw new ConfigurationMessageBusException("Request-response: cannot use topic that is already being used by a consumer");
}
}
}

#region Overrides of BaseMessageBus

protected override async ValueTask DisposeAsyncCore()
Expand Down Expand Up @@ -254,4 +233,4 @@ protected int GetMessagePartition(ProducerSettings producerSettings, [NotNull] T
}

#endregion
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
namespace SlimMessageBus.Host.Kafka;

using SlimMessageBus.Host.Services;

internal class KafkaMessageBusSettingsValidationService : DefaultMessageBusSettingsValidationService<KafkaMessageBusSettings>
{
public KafkaMessageBusSettingsValidationService(MessageBusSettings settings, KafkaMessageBusSettings providerSettings) : base(settings, providerSettings)
{
}

public override void AssertSettings()
{
base.AssertSettings();

if (string.IsNullOrEmpty(ProviderSettings.BrokerList))
{
ThrowFieldNotSet(nameof(KafkaMessageBusSettings.BrokerList));
}
}

protected override void AssertConsumer(ConsumerSettings consumerSettings)
{
base.AssertConsumer(consumerSettings);

if (consumerSettings.GetGroup() == null)
{
ThrowConsumerFieldNotSet(consumerSettings, nameof(BuilderExtensions.KafkaGroup));
}
}

protected override void AssertRequestResponseSettings()
{
base.AssertRequestResponseSettings();

if (Settings.RequestResponse != null)
{
if (Settings.RequestResponse.GetGroup() == null)
{
ThrowRequestResponseFieldNotSet(nameof(BuilderExtensions.KafkaGroup));
}

if (Settings.Consumers.Any(x => x.GetGroup() == Settings.RequestResponse.GetGroup() && x.Path == Settings.RequestResponse.Path))
{
ThrowRequestResponseFieldNotSet(nameof(BuilderExtensions.KafkaGroup), "cannot use topic that is already being used by a consumer");
}
}
}
}
2 changes: 2 additions & 0 deletions src/SlimMessageBus.Host.Mqtt/GlobalUsings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@
global using MQTTnet;
global using MQTTnet.Client;
global using MQTTnet.Extensions.ManagedClient;

global using SlimMessageBus.Host.Services;
Loading

0 comments on commit 934f45c

Please sign in to comment.