Skip to content

Commit

Permalink
[Host.Kafka] Fix how PartitionEOF offset is comitted
Browse files Browse the repository at this point in the history
[Host.Kafka] Upgrade to latest Confluent.Kafka.Net library.
[Host.Kafka] Extend the kafka docs.
[Host.Kafka] Use test containers for Kafka int tests.

Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
  • Loading branch information
zarusz committed Sep 16, 2024
1 parent 5ea6856 commit 2ff704b
Show file tree
Hide file tree
Showing 16 changed files with 192 additions and 137 deletions.
18 changes: 10 additions & 8 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ jobs:
working-directory: ./src

- name: Install Coverlet
#if: false
run: find . -name "*.Test.csproj" | xargs -t -I {} dotnet add {} package coverlet.collector
working-directory: ./src

Expand Down Expand Up @@ -119,10 +120,10 @@ jobs:
azure_eventhub_connectionstring: ${{ secrets.azure_eventhub_connectionstring }}
azure_storagecontainer_connectionstring: ${{ secrets.azure_storagecontainer_connectionstring }}

kafka_brokers: ${{ secrets.kafka_brokers }}
kafka_username: ${{ secrets.kafka_username }}
kafka_password: ${{ secrets.kafka_password }}
kafka_secure: ${{ secrets.kafka_secure }}
#kafka_brokers: ${{ secrets.kafka_brokers }}
#kafka_username: ${{ secrets.kafka_username }}
#kafka_password: ${{ secrets.kafka_password }}
#kafka_secure: ${{ secrets.kafka_secure }}

_mqtt_server: ${{ secrets.mqtt_server }}
_mqtt_port: ${{ secrets.mqtt_port }}
Expand All @@ -137,10 +138,10 @@ jobs:
sqlserver_connectionstring: ${{ secrets.sqlserver_connectionstring }}

# Connects to the local Test Containers
_kafka_brokers: localhost:9092
_kafka_username: user
_kafka_password: password
_kafka_secure: false
kafka_brokers: localhost:9092
kafka_username: user
kafka_password: password
kafka_secure: false

mqtt_server: localhost
mqtt_port: 1883
Expand Down Expand Up @@ -192,6 +193,7 @@ jobs:
name: .NET Tests
path: ./test-results/*.trx
reporter: dotnet-trx
fail-on-error: false

- name: Copy NuGet packages
shell: bash
Expand Down
36 changes: 33 additions & 3 deletions docs/provider_kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Please read the [Introduction](intro.md) before reading this provider documentat
- [Consumers](#consumers)
- [Offset Commit](#offset-commit)
- [Consumer Error Handling](#consumer-error-handling)
- [Debugging](#debugging)
- [Deployment](#deployment)

## Underlying client
Expand All @@ -22,9 +23,9 @@ The SMB Kafka implementation uses [confluent-kafka-dotnet](https://github.com/co

When troubleshooting or fine tuning it is worth reading the `librdkafka` and `confluent-kafka-dotnet` docs:

- [Introduction](https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md
- [Broker version compatibility](https://github.com/edenhill/librdkafka/wiki/Broker-version-compatibility)
- [Using SSL with librdkafka](https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka)
- [Introduction](https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md)
- [Broker version compatibility](https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#broker-version-compatibility)
- [Using SSL with librdkafka](https://github.com/confluentinc/librdkafka/wiki/Using-SSL-with-librdkafka)

## Configuration properties

Expand Down Expand Up @@ -246,6 +247,35 @@ The error handler can perform the following actions:

If no custom error handler is provided, the provider logs the exception and moves on to process the next message.

### Debugging

Kafka uses a sophisticated protocol for partition assignment:

- Partition assignments may change due to factors like rebalancing.
- A running consumer instance might not receive any partitions if there are more consumers than partitions for a given topic.

To better understand what's happening, you can enable Debug level logging in your library, such as SlimMessageBus.Host.Kafka.KafkaGroupConsumer.

At this logging level, you can track the lifecycle events of the consumer group:

```
[00:03:06 INF] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Subscribing to topics: 4p5ma6io-test-ping
[00:03:06 INF] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Consumer loop started
[00:03:12 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Assigned partition, Topic: 4p5ma6io-test-ping, Partition: [0]
[00:03:12 INF] SlimMessageBus.Host.Kafka.KafkaPartitionConsumer Creating consumer for Group: subscriber, Topic: 4p5ma6io-test-ping, Partition: [0]
[00:03:12 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Assigned partition, Topic: 4p5ma6io-test-ping, Partition: [1]
[00:03:12 INF] SlimMessageBus.Host.Kafka.KafkaPartitionConsumer Creating consumer for Group: subscriber, Topic: 4p5ma6io-test-ping, Partition: [1]
...
[00:03:15 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Received message with Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98578, payload size: 57
[00:03:15 INF] SlimMessageBus.Host.Kafka.Test.KafkaMessageBusIt.PingConsumer Got message 073 on topic 4p5ma6io-test-ping.
[00:03:15 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Received message with Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98579, payload size: 57
[00:03:15 INF] SlimMessageBus.Host.Kafka.Test.KafkaMessageBusIt.PingConsumer Got message 075 on topic 4p5ma6io-test-ping.
[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Reached end of partition, Topic: 4p5ma6io-test-ping, Partition: [0], Offset: 100403
[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Commit Offset, Topic: 4p5ma6io-test-ping, Partition: [0], Offset: 100402
[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Reached end of partition, Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98580
[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Commit Offset, Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98579
```

## Deployment

The `librdkafka` distribution for Windows requires [Visual C++ Redistributable for 2013](https://www.microsoft.com/en-US/download/details.aspx?id=40784) installed on the server. More information can be found [here](https://www.microsoft.com/en-US/download/details.aspx?id=40784).
43 changes: 37 additions & 6 deletions docs/provider_kafka.t.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Please read the [Introduction](intro.md) before reading this provider documentat
- [Consumers](#consumers)
- [Offset Commit](#offset-commit)
- [Consumer Error Handling](#consumer-error-handling)
- [Debugging](#debugging)
- [Deployment](#deployment)

## Underlying client
Expand All @@ -22,13 +23,13 @@ The SMB Kafka implementation uses [confluent-kafka-dotnet](https://github.com/co

When troubleshooting or fine tuning it is worth reading the `librdkafka` and `confluent-kafka-dotnet` docs:

- [Introduction](https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md
- [Broker version compatibility](https://github.com/edenhill/librdkafka/wiki/Broker-version-compatibility)
- [Using SSL with librdkafka](https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka)
- [Introduction](https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md)
- [Broker version compatibility](https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#broker-version-compatibility)
- [Using SSL with librdkafka](https://github.com/confluentinc/librdkafka/wiki/Using-SSL-with-librdkafka)

## Configuration properties

Producer, consumer and global configuration properties are described [here](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md).
Producer, consumer and global configuration properties are described [here](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md).
The configuration on the underlying Kafka client can be adjusted like so:

```cs
Expand All @@ -52,7 +53,7 @@ services.AddSlimMessageBus(mbb =>

### Minimizing message latency

There is a good description [here](https://github.com/edenhill/librdkafka/wiki/How-to-decrease-message-latency) on improving the latency by applying producer/consumer settings on librdkafka. Here is how you enter the settings using SlimMessageBus:
There is a good description [here](https://github.com/confluentinc/librdkafka/wiki/How-to-decrease-message-latency) on improving the latency by applying producer/consumer settings on librdkafka. Here is how you enter the settings using SlimMessageBus:

```cs
services.AddSlimMessageBus(mbb =>
Expand Down Expand Up @@ -210,7 +211,8 @@ mbb

### Offset Commit

In the current Kafka provider implementation, SMB handles the manual commit of topic-partition offsets for the consumer. This configuration is controlled through the following methods on the consumer builder:
In the current Kafka provider implementation, SMB handles the manual commit of topic-partition offsets for the consumer.Th
is configuration is controlled through the following methods on the consumer builder:

- `CheckpointEvery(int)` – Commits the offset after a specified number of processed messages.
- `CheckpointAfter(TimeSpan)` – Commits the offset after a specified time interval.
Expand All @@ -236,6 +238,35 @@ The error handler can perform the following actions:

If no custom error handler is provided, the provider logs the exception and moves on to process the next message.

### Debugging

Kafka uses a sophisticated protocol for partition assignment:

- Partition assignments may change due to factors like rebalancing.
- A running consumer instance might not receive any partitions if there are more consumers than partitions for a given topic.

To better understand what's happening, you can enable Debug level logging in your library, such as SlimMessageBus.Host.Kafka.KafkaGroupConsumer.

At this logging level, you can track the lifecycle events of the consumer group:

```
[00:03:06 INF] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Subscribing to topics: 4p5ma6io-test-ping
[00:03:06 INF] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Consumer loop started
[00:03:12 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Assigned partition, Topic: 4p5ma6io-test-ping, Partition: [0]
[00:03:12 INF] SlimMessageBus.Host.Kafka.KafkaPartitionConsumer Creating consumer for Group: subscriber, Topic: 4p5ma6io-test-ping, Partition: [0]
[00:03:12 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Assigned partition, Topic: 4p5ma6io-test-ping, Partition: [1]
[00:03:12 INF] SlimMessageBus.Host.Kafka.KafkaPartitionConsumer Creating consumer for Group: subscriber, Topic: 4p5ma6io-test-ping, Partition: [1]
...
[00:03:15 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Received message with Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98578, payload size: 57
[00:03:15 INF] SlimMessageBus.Host.Kafka.Test.KafkaMessageBusIt.PingConsumer Got message 073 on topic 4p5ma6io-test-ping.
[00:03:15 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Received message with Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98579, payload size: 57
[00:03:15 INF] SlimMessageBus.Host.Kafka.Test.KafkaMessageBusIt.PingConsumer Got message 075 on topic 4p5ma6io-test-ping.
[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Reached end of partition, Topic: 4p5ma6io-test-ping, Partition: [0], Offset: 100403
[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Commit Offset, Topic: 4p5ma6io-test-ping, Partition: [0], Offset: 100402
[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Reached end of partition, Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98580
[00:03:16 DBG] SlimMessageBus.Host.Kafka.KafkaGroupConsumer Group [subscriber]: Commit Offset, Topic: 4p5ma6io-test-ping, Partition: [1], Offset: 98579
```

## Deployment

The `librdkafka` distribution for Windows requires [Visual C++ Redistributable for 2013](https://www.microsoft.com/en-US/download/details.aspx?id=40784) installed on the server. More information can be found [here](https://www.microsoft.com/en-US/download/details.aspx?id=40784).
2 changes: 1 addition & 1 deletion src/Host.Plugin.Properties.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<Import Project="Common.NuGet.Properties.xml" />

<PropertyGroup>
<Version>2.5.3-rc1</Version>
<Version>2.5.3-rc2</Version>
</PropertyGroup>

</Project>
10 changes: 4 additions & 6 deletions src/Infrastructure/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version: '3.4'
version: "3.4"

services:
zookeeper:
Expand All @@ -16,13 +16,13 @@ services:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_CREATE_TOPICS: "user-test-ping:2:1,user-test-echo:2:1"
KAFKA_CREATE_TOPICS: "user-test-ping:2:1,user-test-echo:2:1,user-test-echo-resp:2:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
depends_on:
- zookeeper
networks:
- slim

mqtt:
container_name: slim.mqtt
image: eclipse-mosquitto:2.0.18
Expand Down Expand Up @@ -74,16 +74,14 @@ services:
- "11002:11002"
networks:
- slim

nats:
container_name: slim.nats
image: nats:2.10
ports:
- 4222:4222
networks:
- slim


networks:
slim: {}

Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public interface IKafkaPartitionConsumer : IDisposable

void OnPartitionAssigned(TopicPartition partition);
Task OnMessage(ConsumeResult message);
void OnPartitionEndReached(TopicPartitionOffset offset);
void OnPartitionEndReached();
void OnPartitionRevoked();

void OnClose();
Expand Down
16 changes: 11 additions & 5 deletions src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ protected IConsumer CreateConsumer(string group)
// ToDo: add support for auto commit
config.EnableAutoCommit = false;
// Notify when we reach EoF, so that we can do a manual commit
config.EnablePartitionEof = true;
config.EnablePartitionEof = true;

var consumer = MessageBus.ProviderSettings.ConsumerBuilderFactory(config)
.SetStatisticsHandler((_, json) => OnStatistics(json))
.SetStatisticsHandler((_, json) => OnStatistics(json))
.SetPartitionsAssignedHandler((_, partitions) => OnPartitionAssigned(partitions))
.SetPartitionsRevokedHandler((_, partitions) => OnPartitionRevoked(partitions))
.SetOffsetsCommittedHandler((_, offsets) => OnOffsetsCommitted(offsets))
Expand Down Expand Up @@ -201,7 +201,7 @@ protected virtual void OnPartitionEndReached(TopicPartitionOffset offset)
Logger.LogDebug("Group [{Group}]: Reached end of partition, Topic: {Topic}, Partition: {Partition}, Offset: {Offset}", Group, offset.Topic, offset.Partition, offset.Offset);

var processor = _processors[offset.TopicPartition];
processor.OnPartitionEndReached(offset);
processor.OnPartitionEndReached();
}

protected async virtual ValueTask OnMessage(ConsumeResult message)
Expand All @@ -216,11 +216,17 @@ protected virtual void OnOffsetsCommitted(CommittedOffsets e)
{
if (e.Error.IsError || e.Error.IsFatal)
{
Logger.LogWarning("Group [{Group}]: Failed to commit offsets: [{Offsets}], error: {error}", Group, string.Join(", ", e.Offsets), e.Error.Reason);
if (Logger.IsEnabled(LogLevel.Warning))
{
Logger.LogWarning("Group [{Group}]: Failed to commit offsets: [{Offsets}], error: {ErrorMessage}", Group, string.Join(", ", e.Offsets), e.Error.Reason);
}
}
else
{
Logger.LogTrace("Group [{Group}]: Successfully committed offsets: [{Offsets}]", Group, string.Join(", ", e.Offsets));
if (Logger.IsEnabled(LogLevel.Debug))
{
Logger.LogDebug("Group [{Group}]: Successfully committed offsets: [{Offsets}]", Group, string.Join(", ", e.Offsets));
}
}
}

Expand Down
38 changes: 26 additions & 12 deletions src/SlimMessageBus.Host.Kafka/Consumer/KafkaPartitionConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,16 @@ protected KafkaPartitionConsumer(ILoggerFactory loggerFactory, AbstractConsumerS
_commitController = commitController;
_messageProcessor = messageProcessor;

// ToDo: Add support for Kafka driven automatic commit
// ToDo: Add support for Kafka driven automatic commit (https://github.com/zarusz/SlimMessageBus/issues/131)
CheckpointTrigger = CreateCheckpointTrigger();
}

private ICheckpointTrigger CreateCheckpointTrigger()
{
var f = new CheckpointTriggerFactory(LoggerFactory, (configuredCheckpoints) => $"The checkpoint settings ({nameof(BuilderExtensions.CheckpointAfter)} and {nameof(BuilderExtensions.CheckpointEvery)}) across all the consumers that use the same Topic {TopicPartition.Topic} and Group {Group} must be the same (found settings are: {string.Join(", ", configuredCheckpoints)})");
var f = new CheckpointTriggerFactory(
LoggerFactory,
(configuredCheckpoints) => $"The checkpoint settings ({nameof(BuilderExtensions.CheckpointAfter)} and {nameof(BuilderExtensions.CheckpointEvery)}) across all the consumers that use the same Topic {TopicPartition.Topic} and Group {Group} must be the same (found settings are: {string.Join(", ", configuredCheckpoints)})");

return f.Create(ConsumerSettings);
}

Expand Down Expand Up @@ -101,11 +104,22 @@ public async Task OnMessage(ConsumeResult message)
_lastOffset = message.TopicPartitionOffset;

var messageHeaders = message.ToHeaders(_headerSerializer);

// Log in trace level all the message headers converted to string
if (_logger.IsEnabled(LogLevel.Trace))
{
foreach (var header in messageHeaders)
{
_logger.LogTrace("Group [{Group}]: Topic: {Topic}, Partition: {Partition}, Offset: {Offset}, Message Header: {HeaderKey}={HeaderValue}", Group, message.TopicPartitionOffset.Topic, message.TopicPartitionOffset.Partition, message.TopicPartitionOffset.Offset, header.Key, header.Value);
}
}

var r = await _messageProcessor.ProcessMessage(message, messageHeaders, cancellationToken: _cancellationTokenSource.Token).ConfigureAwait(false);
if (r.Exception != null)
{
// ToDo: Retry logic
// The OnMessageFaulted was called at this point by the MessageProcessor.
// The IKafkaConsumerErrorHandler and OnMessageFaulted was called at this point by the MessageProcessor.
// We can only log and move to the next message, as the error handling is done by the MessageProcessor.
LogError(r.Exception, message);
}

if (CheckpointTrigger != null && CheckpointTrigger.Increment())
Expand All @@ -114,20 +128,20 @@ public async Task OnMessage(ConsumeResult message)
}
}
catch (Exception e)
{
_logger.LogError(e, "Group [{Group}]: Error occurred while consuming a message at Topic: {Topic}, Partition: {Partition}, Offset: {Offset}", Group, message.Topic, message.Partition, message.Offset);
{
LogError(e, message);
throw;
}
}
}

private void LogError(Exception e, ConsumeResult<Ignore, byte[]> message)
=> _logger.LogError(e, "Group [{Group}]: Error occurred while consuming a message at Topic: {Topic}, Partition: {Partition}, Offset: {Offset}, Error: {ErrorMessage}", Group, message.Topic, message.Partition, message.Offset, e.Message);

public void OnPartitionEndReached(TopicPartitionOffset offset)
public void OnPartitionEndReached()
{
if (CheckpointTrigger != null)
{
if (offset != null)
{
Commit(offset);
}
Commit(_lastOffset);
}
}

Expand Down
Loading

0 comments on commit 2ff704b

Please sign in to comment.