Skip to content

Commit

Permalink
[Host.Kafka] Kafka consumers started twice on Host.Run #213
Browse files Browse the repository at this point in the history
Upgrade to latest libs
Adjust logging when starting consumers

Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
  • Loading branch information
zarusz committed Feb 29, 2024
1 parent 3d507b2 commit 85791c0
Show file tree
Hide file tree
Showing 35 changed files with 216 additions and 146 deletions.
2 changes: 1 addition & 1 deletion src/Host.Plugin.Properties.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<PropertyGroup>
<TargetFrameworks>netstandard2.0;net6.0;net8.0</TargetFrameworks>
<Version>2.2.2</Version>
<Version>2.2.3</Version>
</PropertyGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="8.0.1" />
<PackageReference Include="NSwag.AspNetCore" Version="14.0.2" />
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="8.0.2" />
<PackageReference Include="NSwag.AspNetCore" Version="14.0.3" />
<ProjectReference Include="..\..\SlimMessageBus.Host.AsyncApi\SlimMessageBus.Host.AsyncApi.csproj" />
<ProjectReference Include="..\..\SlimMessageBus.Host.AzureServiceBus\SlimMessageBus.Host.AzureServiceBus.csproj" />
<ProjectReference Include="..\..\SlimMessageBus.Host.Kafka\SlimMessageBus.Host.Kafka.csproj" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="8.0.0" />
<PackageReference Include="Swashbuckle.AspNetCore" Version="6.5.0" />
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="8.0.1" />
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="8.0.2" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="8.0.0" />
<PackageReference Include="System.Drawing.Common" Version="8.0.1" />
<PackageReference Include="System.Drawing.Common" Version="8.0.2" />
</ItemGroup>

<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion src/Samples/Sample.OutboxWebApi/Sample.OutboxWebApi.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="8.0.1" />
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="8.0.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="8.0.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Azure.Messaging.EventHubs.Processor" Version="5.10.0" />
<PackageReference Include="Azure.Messaging.EventHubs.Processor" Version="5.11.0" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.17.2" />
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.17.3" />
</ItemGroup>

<ItemGroup>
Expand Down
6 changes: 3 additions & 3 deletions src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ protected override void Build()
{
base.Build();

_logger.LogInformation("Creating producers");
_logger.LogInformation("Creating producers for {BusName} bus...", Name);
_producer = CreateProducerInternal();
}

Expand All @@ -42,7 +42,7 @@ public void Flush()
public IProducer CreateProducerInternal()
{
_logger.LogTrace("Creating producer settings");
var config = new ProducerConfig()
var config = new ProducerConfig
{
BootstrapServers = ProviderSettings.BrokerList
};
Expand Down Expand Up @@ -127,7 +127,7 @@ protected override async Task ProduceToTransport(object message, string path, by

if (messageHeaders != null && messageHeaders.Count > 0)
{
kafkaMessage.Headers = new Headers();
kafkaMessage.Headers = [];

foreach (var keyValue in messageHeaders)
{
Expand Down
1 change: 0 additions & 1 deletion src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ void AddTopicConsumer(string topic, IMessageProcessor<MqttApplicationMessage> me
AddConsumer(consumer);
}

_logger.LogInformation("Creating consumers");
foreach (var (path, consumerSettings) in Settings.Consumers.GroupBy(x => x.Path).ToDictionary(x => x.Key, x => x.ToList()))
{
var processor = new MessageProcessor<MqttApplicationMessage>(consumerSettings, this, MessageProvider, path, responseProducer: this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Data.SqlClient" Version="5.1.4" />
<PackageReference Include="Microsoft.Data.SqlClient" Version="5.2.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="3.1.0" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="StackExchange.Redis" Version="2.7.17" />
<PackageReference Include="StackExchange.Redis" Version="2.7.27" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Google.Protobuf" Version="3.25.2" />
<PackageReference Include="Google.Protobuf" Version="3.25.3" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="3.1.0" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,15 @@
/// <summary>
/// <see cref="IHostedService"/> responsible for starting message bus consumers.
/// </summary>
public class MessageBusHostedService : IHostedService
public class MessageBusHostedService(IConsumerControl bus, MessageBusSettings messageBusSettings) : IHostedService
{
private readonly IConsumerControl _bus;
private readonly MessageBusSettings _messageBusSettings;

public MessageBusHostedService(IConsumerControl bus, MessageBusSettings messageBusSettings)
{
_bus = bus;
_messageBusSettings = messageBusSettings;
}

public async Task StartAsync(CancellationToken cancellationToken)
{
if (_messageBusSettings.AutoStartConsumers)
if (messageBusSettings.AutoStartConsumers)
{
await _bus.Start();
await bus.Start();
}
}

public Task StopAsync(CancellationToken cancellationToken) => _bus.Stop();
public Task StopAsync(CancellationToken cancellationToken) => bus.Stop();
}
119 changes: 71 additions & 48 deletions src/SlimMessageBus.Host/MessageBusBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public abstract class MessageBusBase : IDisposable, IAsyncDisposable, IMasterMes
private CancellationTokenSource _cancellationTokenSource = new();
private IMessageSerializer _serializer;
private readonly MessageHeaderService _headerService;
private readonly List<AbstractConsumer> _consumers = new();
private readonly List<AbstractConsumer> _consumers = [];

/// <summary>
/// Special market reference that signifies a dummy producer settings for response types.
Expand Down Expand Up @@ -60,10 +60,11 @@ public virtual IMessageSerializer Serializer
#endregion

private readonly object _initTaskLock = new();

private Task _initTask = null;

#region Start & Stop

private readonly object _startLock = new();

public bool IsStarted { get; private set; }

Expand Down Expand Up @@ -201,58 +202,80 @@ private async Task OnBusLifecycle(MessageBusLifecycleEventType eventType)
}

public async Task Start()
{
if (!IsStarted && !IsStarting)
{
IsStarting = true;
try
{
await EnsureInitFinished();

_logger.LogInformation("Starting consumers for {BusName} bus...", Name);
await OnBusLifecycle(MessageBusLifecycleEventType.Starting).ConfigureAwait(false);
{
lock (_startLock)
{
if (IsStarting || IsStarted)
{
return;
}
IsStarting = true;
}

try
{
await EnsureInitFinished();

await CreateConsumers();
await OnStart().ConfigureAwait(false);
await Task.WhenAll(_consumers.Select(x => x.Start())).ConfigureAwait(false);
_logger.LogInformation("Starting consumers for {BusName} bus...", Name);
await OnBusLifecycle(MessageBusLifecycleEventType.Starting).ConfigureAwait(false);

await OnBusLifecycle(MessageBusLifecycleEventType.Started).ConfigureAwait(false);
_logger.LogInformation("Started consumers for {BusName} bus", Name);

IsStarted = true;
}
finally
{
IsStarting = false;
}
await CreateConsumers();
await OnStart().ConfigureAwait(false);
await Task.WhenAll(_consumers.Select(x => x.Start())).ConfigureAwait(false);

await OnBusLifecycle(MessageBusLifecycleEventType.Started).ConfigureAwait(false);
_logger.LogInformation("Started consumers for {BusName} bus", Name);

lock (_startLock)
{
IsStarted = true;
}
}
finally
{
lock (_startLock)
{
IsStarting = false;
}
}
}

public async Task Stop()
{
if (IsStarted && !IsStopping)
{
IsStopping = true;
try
{
lock (_startLock)
{
if (IsStopping || !IsStarted)
{
await EnsureInitFinished();

_logger.LogInformation("Stopping consumers for {BusName} bus...", Name);
await OnBusLifecycle(MessageBusLifecycleEventType.Stopping).ConfigureAwait(false);
return;
}
IsStopping = true;
}

await Task.WhenAll(_consumers.Select(x => x.Stop())).ConfigureAwait(false);
await OnStop().ConfigureAwait(false);
await DestroyConsumers().ConfigureAwait(false);

await OnBusLifecycle(MessageBusLifecycleEventType.Stopped).ConfigureAwait(false);
_logger.LogInformation("Stopped consumers for {BusName} bus", Name);

IsStarted = false;
}
finally
{
IsStopping = false;
}
try
{
await EnsureInitFinished();

_logger.LogInformation("Stopping consumers for {BusName} bus...", Name);
await OnBusLifecycle(MessageBusLifecycleEventType.Stopping).ConfigureAwait(false);

await Task.WhenAll(_consumers.Select(x => x.Stop())).ConfigureAwait(false);
await OnStop().ConfigureAwait(false);
await DestroyConsumers().ConfigureAwait(false);

await OnBusLifecycle(MessageBusLifecycleEventType.Stopped).ConfigureAwait(false);
_logger.LogInformation("Stopped consumers for {BusName} bus", Name);

lock (_startLock)
{
IsStarted = false;
}
}
finally
{
lock (_startLock)
{
IsStopping = false;
}
}
}

Expand Down Expand Up @@ -338,13 +361,13 @@ protected async virtual ValueTask DisposeAsyncCore()

protected virtual Task CreateConsumers()
{
_logger.LogInformation("Creating consumers");
_logger.LogInformation("Creating consumers for {BusName} bus...", Name);
return Task.CompletedTask;
}

protected async virtual Task DestroyConsumers()
{
_logger.LogInformation("Destroying consumers");
_logger.LogInformation("Destroying consumers for {BusName} bus...", Name);

foreach (var consumer in _consumers)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.8.0" />
<PackageReference Include="xunit" Version="2.6.6" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.5.6">
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.9.0" />
<PackageReference Include="xunit" Version="2.7.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.5.7">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="coverlet.collector" Version="6.0.0">
<PackageReference Include="coverlet.collector" Version="6.0.1">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="8.0.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.8.0" />
<PackageReference Include="xunit" Version="2.6.6" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.5.6">
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.9.0" />
<PackageReference Include="xunit" Version="2.7.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.5.7">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets>
</PackageReference>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="8.0.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.8.0" />
<PackageReference Include="xunit" Version="2.6.6" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.5.6">
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.9.0" />
<PackageReference Include="xunit" Version="2.7.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.5.7">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,8 @@

public class MessageBusBuilderTests
{
internal class DerivedMessageBusBuilder : MessageBusBuilder
internal class DerivedMessageBusBuilder(MessageBusBuilder other) : MessageBusBuilder(other)
{
public DerivedMessageBusBuilder(MessageBusBuilder other) : base(other)
{
}
}

[Fact]
Expand Down Expand Up @@ -90,14 +87,14 @@ public void Given_OtherBuilder_When_CopyConstructorUsed_Then_AllStateIsCopied()
// arrange
var subject = MessageBusBuilder.Create();
subject.WithProvider(Mock.Of<Func<MessageBusSettings, IMessageBus>>());
subject.AddChildBus("Bus1", mbb => { });
subject.AddChildBus("Bus1", mbb => { });

// act
var copy = new DerivedMessageBusBuilder(subject);

// assert
copy.Settings.Should().BeSameAs(subject.Settings);
copy.Settings.Name.Should().BeSameAs(subject.Settings.Name);
copy.Settings.Name.Should().BeSameAs(subject.Settings.Name);
copy.Children.Should().BeSameAs(subject.Children);
copy.BusFactory.Should().BeSameAs(subject.BusFactory);
copy.PostConfigurationActions.Should().BeSameAs(subject.PostConfigurationActions);
Expand Down
Loading

0 comments on commit 85791c0

Please sign in to comment.