Skip to content

Commit

Permalink
[Host.Kafka] Fix how PartitionEOF offset is comitted
Browse files Browse the repository at this point in the history
[Host.Kafka] Upgrade to latest Confluent.Kafka.Net library.
[Host.Kafka] Extend the kafka docs.
[Host.Kafka] Use test containers for Kafka int tests.

Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
  • Loading branch information
zarusz committed Sep 16, 2024
1 parent 5ea6856 commit 9def8b3
Show file tree
Hide file tree
Showing 14 changed files with 156 additions and 108 deletions.
22 changes: 13 additions & 9 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ jobs:
working-directory: ./src

- name: Install Coverlet
#if: false
run: find . -name "*.Test.csproj" | xargs -t -I {} dotnet add {} package coverlet.collector
working-directory: ./src

Expand Down Expand Up @@ -88,6 +89,7 @@ jobs:
working-directory: ./src

- name: Unit Tests
#if: false
run: |
dotnet test $SOLUTION_NAME \
--configuration $SOLUTION_CONFIGURATION \
Expand All @@ -110,7 +112,8 @@ jobs:
--verbosity normal \
--logger "trx;LogFilePrefix=Integration" \
--collect:"XPlat Code Coverage;Format=opencover" \
--filter "Category=Integration"
--filter "Category=Integration"
#--filter "Category=Integration&Transport=Kafka"
working-directory: ./src
env:
# Connects to the Azure cloud
Expand All @@ -119,10 +122,10 @@ jobs:
azure_eventhub_connectionstring: ${{ secrets.azure_eventhub_connectionstring }}
azure_storagecontainer_connectionstring: ${{ secrets.azure_storagecontainer_connectionstring }}

kafka_brokers: ${{ secrets.kafka_brokers }}
kafka_username: ${{ secrets.kafka_username }}
kafka_password: ${{ secrets.kafka_password }}
kafka_secure: ${{ secrets.kafka_secure }}
#kafka_brokers: ${{ secrets.kafka_brokers }}
#kafka_username: ${{ secrets.kafka_username }}
#kafka_password: ${{ secrets.kafka_password }}
#kafka_secure: ${{ secrets.kafka_secure }}

_mqtt_server: ${{ secrets.mqtt_server }}
_mqtt_port: ${{ secrets.mqtt_port }}
Expand All @@ -137,10 +140,10 @@ jobs:
sqlserver_connectionstring: ${{ secrets.sqlserver_connectionstring }}

# Connects to the local Test Containers
_kafka_brokers: localhost:9092
_kafka_username: user
_kafka_password: password
_kafka_secure: false
kafka_brokers: localhost:9092
kafka_username: user
kafka_password: password
kafka_secure: false

mqtt_server: localhost
mqtt_port: 1883
Expand Down Expand Up @@ -192,6 +195,7 @@ jobs:
name: .NET Tests
path: ./test-results/*.trx
reporter: dotnet-trx
fail-on-error: false

- name: Copy NuGet packages
shell: bash
Expand Down
30 changes: 30 additions & 0 deletions docs/provider_kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Please read the [Introduction](intro.md) before reading this provider documentat
- [Consumers](#consumers)
- [Offset Commit](#offset-commit)
- [Consumer Error Handling](#consumer-error-handling)
- [Debugging](#debugging)
- [Deployment](#deployment)

## Underlying client
Expand Down Expand Up @@ -246,6 +247,35 @@ The error handler can perform the following actions:

If no custom error handler is provided, the provider logs the exception and moves on to process the next message.

### Debugging

Kafka uses a sophisticated protocol for partition assignment:

- Partition assignments may change due to factors like rebalancing.
- A running consumer instance might not receive any partitions if there are more consumers than partitions for a given topic.

To better understand what's happening, you can enable Debug level logging in your library, such as SlimMessageBus.Host.Kafka.KafkaGroupConsumer.

At this logging level, you can track the lifecycle events of the consumer group:

```
[00:03:06 INF] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Subscribing to topics: 4p5ma6io-test-ping
[00:03:06 INF] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Consumer loop started
[00:03:12 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Assigned partition, Topic: 4p5ma6io-test-ping, Partition: [0]
[00:03:12 INF] SlimMessageBus.Host.Kafka.KafkaPartitionConsumer Creating consumer for Group: subscriber, Topic: 4p5ma6io-test-ping, Partition: [0]
[00:03:12 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Assigned partition, Topic: 4p5ma6io-test-ping, Partition: [1]
[00:03:12 INF] SlimMessageBus.Host.Kafka.KafkaPartitionConsumer Creating consumer for Group: subscriber, Topic: 4p5ma6io-test-ping, Partition: [1]
...
[00:03:15 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Received message with Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98578, payload size: 57
[00:03:15 INF] SlimMessageBus.Host.Kafka.Test.KafkaMessageBusIt.PingConsumer Got message 073 on topic 4p5ma6io-test-ping.
[00:03:15 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Received message with Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98579, payload size: 57
[00:03:15 INF] SlimMessageBus.Host.Kafka.Test.KafkaMessageBusIt.PingConsumer Got message 075 on topic 4p5ma6io-test-ping.
[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Reached end of partition, Topic: 4p5ma6io-test-ping, Partition: [0], Offset: 100403
[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Commit Offset, Topic: 4p5ma6io-test-ping, Partition: [0], Offset: 100402
[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Reached end of partition, Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98580
[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Commit Offset, Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98579
```

## Deployment

The `librdkafka` distribution for Windows requires [Visual C++ Redistributable for 2013](https://www.microsoft.com/en-US/download/details.aspx?id=40784) installed on the server. More information can be found [here](https://www.microsoft.com/en-US/download/details.aspx?id=40784).
30 changes: 30 additions & 0 deletions docs/provider_kafka.t.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Please read the [Introduction](intro.md) before reading this provider documentat
- [Consumers](#consumers)
- [Offset Commit](#offset-commit)
- [Consumer Error Handling](#consumer-error-handling)
- [Debugging](#debugging)
- [Deployment](#deployment)

## Underlying client
Expand Down Expand Up @@ -236,6 +237,35 @@ The error handler can perform the following actions:

If no custom error handler is provided, the provider logs the exception and moves on to process the next message.

### Debugging

Kafka uses a sophisticated protocol for partition assignment:

- Partition assignments may change due to factors like rebalancing.
- A running consumer instance might not receive any partitions if there are more consumers than partitions for a given topic.

To better understand what's happening, you can enable Debug level logging in your library, such as SlimMessageBus.Host.Kafka.KafkaGroupConsumer.

At this logging level, you can track the lifecycle events of the consumer group:

```
[00:03:06 INF] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Subscribing to topics: 4p5ma6io-test-ping
[00:03:06 INF] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Consumer loop started
[00:03:12 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Assigned partition, Topic: 4p5ma6io-test-ping, Partition: [0]
[00:03:12 INF] SlimMessageBus.Host.Kafka.KafkaPartitionConsumer Creating consumer for Group: subscriber, Topic: 4p5ma6io-test-ping, Partition: [0]
[00:03:12 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Assigned partition, Topic: 4p5ma6io-test-ping, Partition: [1]
[00:03:12 INF] SlimMessageBus.Host.Kafka.KafkaPartitionConsumer Creating consumer for Group: subscriber, Topic: 4p5ma6io-test-ping, Partition: [1]
...
[00:03:15 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Received message with Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98578, payload size: 57
[00:03:15 INF] SlimMessageBus.Host.Kafka.Test.KafkaMessageBusIt.PingConsumer Got message 073 on topic 4p5ma6io-test-ping.
[00:03:15 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Received message with Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98579, payload size: 57
[00:03:15 INF] SlimMessageBus.Host.Kafka.Test.KafkaMessageBusIt.PingConsumer Got message 075 on topic 4p5ma6io-test-ping.
[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Reached end of partition, Topic: 4p5ma6io-test-ping, Partition: [0], Offset: 100403
[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Commit Offset, Topic: 4p5ma6io-test-ping, Partition: [0], Offset: 100402
[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Reached end of partition, Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98580
[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Commit Offset, Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98579
```

## Deployment

The `librdkafka` distribution for Windows requires [Visual C++ Redistributable for 2013](https://www.microsoft.com/en-US/download/details.aspx?id=40784) installed on the server. More information can be found [here](https://www.microsoft.com/en-US/download/details.aspx?id=40784).
10 changes: 4 additions & 6 deletions src/Infrastructure/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version: '3.4'
version: "3.4"

services:
zookeeper:
Expand All @@ -16,13 +16,13 @@ services:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_CREATE_TOPICS: "user-test-ping:2:1,user-test-echo:2:1"
KAFKA_CREATE_TOPICS: "user-test-ping:2:1,user-test-echo:2:1,user-test-echo-resp:2:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
depends_on:
- zookeeper
networks:
- slim

mqtt:
container_name: slim.mqtt
image: eclipse-mosquitto:2.0.18
Expand Down Expand Up @@ -74,16 +74,14 @@ services:
- "11002:11002"
networks:
- slim

nats:
container_name: slim.nats
image: nats:2.10
ports:
- 4222:4222
networks:
- slim


networks:
slim: {}

Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public interface IKafkaPartitionConsumer : IDisposable

void OnPartitionAssigned(TopicPartition partition);
Task OnMessage(ConsumeResult message);
void OnPartitionEndReached(TopicPartitionOffset offset);
void OnPartitionEndReached();
void OnPartitionRevoked();

void OnClose();
Expand Down
8 changes: 4 additions & 4 deletions src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ protected IConsumer CreateConsumer(string group)
// ToDo: add support for auto commit
config.EnableAutoCommit = false;
// Notify when we reach EoF, so that we can do a manual commit
config.EnablePartitionEof = true;
config.EnablePartitionEof = true;

var consumer = MessageBus.ProviderSettings.ConsumerBuilderFactory(config)
.SetStatisticsHandler((_, json) => OnStatistics(json))
.SetStatisticsHandler((_, json) => OnStatistics(json))
.SetPartitionsAssignedHandler((_, partitions) => OnPartitionAssigned(partitions))
.SetPartitionsRevokedHandler((_, partitions) => OnPartitionRevoked(partitions))
.SetOffsetsCommittedHandler((_, offsets) => OnOffsetsCommitted(offsets))
Expand Down Expand Up @@ -201,7 +201,7 @@ protected virtual void OnPartitionEndReached(TopicPartitionOffset offset)
Logger.LogDebug("Group [{Group}]: Reached end of partition, Topic: {Topic}, Partition: {Partition}, Offset: {Offset}", Group, offset.Topic, offset.Partition, offset.Offset);

var processor = _processors[offset.TopicPartition];
processor.OnPartitionEndReached(offset);
processor.OnPartitionEndReached();
}

protected async virtual ValueTask OnMessage(ConsumeResult message)
Expand All @@ -220,7 +220,7 @@ protected virtual void OnOffsetsCommitted(CommittedOffsets e)
}
else
{
Logger.LogTrace("Group [{Group}]: Successfully committed offsets: [{Offsets}]", Group, string.Join(", ", e.Offsets));
Logger.LogDebug("Group [{Group}]: Successfully committed offsets: [{Offsets}]", Group, string.Join(", ", e.Offsets));
}
}

Expand Down
25 changes: 18 additions & 7 deletions src/SlimMessageBus.Host.Kafka/Consumer/KafkaPartitionConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ protected KafkaPartitionConsumer(ILoggerFactory loggerFactory, AbstractConsumerS

private ICheckpointTrigger CreateCheckpointTrigger()
{
var f = new CheckpointTriggerFactory(LoggerFactory, (configuredCheckpoints) => $"The checkpoint settings ({nameof(BuilderExtensions.CheckpointAfter)} and {nameof(BuilderExtensions.CheckpointEvery)}) across all the consumers that use the same Topic {TopicPartition.Topic} and Group {Group} must be the same (found settings are: {string.Join(", ", configuredCheckpoints)})");
var f = new CheckpointTriggerFactory(
LoggerFactory,
(configuredCheckpoints) => $"The checkpoint settings ({nameof(BuilderExtensions.CheckpointAfter)} and {nameof(BuilderExtensions.CheckpointEvery)}) across all the consumers that use the same Topic {TopicPartition.Topic} and Group {Group} must be the same (found settings are: {string.Join(", ", configuredCheckpoints)})");

return f.Create(ConsumerSettings);
}

Expand Down Expand Up @@ -101,10 +104,21 @@ public async Task OnMessage(ConsumeResult message)
_lastOffset = message.TopicPartitionOffset;

var messageHeaders = message.ToHeaders(_headerSerializer);

// Log in trace level all the message headers converted to string
if (_logger.IsEnabled(LogLevel.Trace))
{
foreach (var header in messageHeaders)
{
_logger.LogTrace("Group [{Group}]: Topic: {Topic}, Partition: {Partition}, Offset: {Offset}, Message Header: {HeaderKey}={HeaderValue}", Group, message.TopicPartitionOffset.Topic, message.TopicPartitionOffset.Partition, message.TopicPartitionOffset.Offset, header.Key, header.Value);
}
}

var r = await _messageProcessor.ProcessMessage(message, messageHeaders, cancellationToken: _cancellationTokenSource.Token).ConfigureAwait(false);
if (r.Exception != null)
{
// ToDo: Retry logic
_logger.LogWarning(r.Exception, "Group [{Group}]: Error occurred while consuming a message at Topic: {Topic}, Partition: {Partition}, Offset: {Offset}, Error: {ErrorMessage}", Group, message.Topic, message.Partition, message.Offset, r.Exception.Message);
// The IKafkaErrorHandler was called at this point by the MessageProcessor.
// The OnMessageFaulted was called at this point by the MessageProcessor.
}

Expand All @@ -120,14 +134,11 @@ public async Task OnMessage(ConsumeResult message)
}
}

public void OnPartitionEndReached(TopicPartitionOffset offset)
public void OnPartitionEndReached()
{
if (CheckpointTrigger != null)
{
if (offset != null)
{
Commit(offset);
}
Commit(_lastOffset);
}
}

Expand Down
10 changes: 8 additions & 2 deletions src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,14 @@ protected override async Task<ProduceToTransportBulkResult<T>> ProduceToTranspor
? GetMessagePartition(producerSettings, messageType, envelope.Message, path)
: NoPartition;

_logger.LogTrace("Producing message {Message} of type {MessageType}, on topic {Topic}, partition {Partition}, key size {KeySize}, payload size {MessageSize}",
envelope.Message, messageType?.Name, path, partition, key?.Length ?? 0, messagePayload?.Length ?? 0);
_logger.LogDebug("Producing message {Message} of type {MessageType}, topic {Topic}, partition {Partition}, key size {KeySize}, payload size {MessageSize}, headers count {MessageHeaderCount}",
envelope.Message,
messageType?.Name,
path,
partition,
key?.Length ?? 0,
messagePayload?.Length ?? 0,
kafkaMessage.Headers?.Count ?? 0);

// send the message to topic
var task = partition == NoPartition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="2.3.0" />
<PackageReference Include="Confluent.Kafka" Version="2.5.3" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public async Task When_OnPartitionEndReached_Then_ShouldCommit()
await _subject.Value.OnMessage(message);

// act
_subject.Value.OnPartitionEndReached(message.TopicPartitionOffset);
_subject.Value.OnPartitionEndReached();

// assert
_commitControllerMock.Verify(x => x.Commit(message.TopicPartitionOffset), Times.Once);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,17 @@ public void When_NewInstance_Then_TopicPartitionSet()
}

[Fact]
public void When_OnPartitionEndReached_Then_ShouldCommit()
public async Task When_OnPartitionEndReached_Then_ShouldCommit()
{
// arrange
var partition = new TopicPartitionOffset(_topicPartition, new Offset(10));
var message = GetSomeMessage();

_subject.OnPartitionAssigned(_topicPartition);
await _subject.OnMessage(message);

// act
_subject.OnPartitionEndReached(partition);
_subject.OnPartitionEndReached();

// assert
_commitControllerMock.Verify(x => x.Commit(partition), Times.Once);
Expand Down
Loading

0 comments on commit 9def8b3

Please sign in to comment.