Skip to content

Commit

Permalink
[test]: make Kafka consumer read from the earliest messages (#4981)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
- Resolves #4979

## Description of the changes
- Change kafka consumer to consume messages from the oldest possible
offsets to prevent failure when the test produces messages before the
consumer is ready.

## How was this change tested?
- Start a kafka server hosted at localhost:9092
```
docker run --name kafka \
    --network jaeger \
    -p 9092:9092 \
    -e KAFKA_CFG_NODE_ID=0 \
    -e KAFKA_CFG_PROCESS_ROLES=controller,broker \
    -e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 \
    -e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
    -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
    -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
    -e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
    -e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
    bitnami/kafka:3.6
```
- Run `./scripts/kafka-integration-test.sh`

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `yarn lint` and `yarn test`

---------

Signed-off-by: James Ryans <james.ryans2012@gmail.com>
  • Loading branch information
james-ryans authored Dec 1, 2023
1 parent 1dd4b60 commit 02dc655
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 6 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci-kafka.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ jobs:
env:
ALLOW_ANONYMOUS_LOGIN: yes
kafka:
image: bitnami/kafka:3.1.0
image: bitnami/kafka:3.6.0
ports:
- 9092:9092
options: >-
Expand Down
1 change: 1 addition & 0 deletions cmd/ingester/app/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit
consumerConfig := kafkaConsumer.Configuration{
Brokers: options.Brokers,
Topic: options.Topic,
InitialOffset: options.InitialOffset,
GroupID: options.GroupID,
ClientID: options.ClientID,
ProtocolVersion: options.ProtocolVersion,
Expand Down
12 changes: 8 additions & 4 deletions pkg/kafka/consumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,11 @@ type Configuration struct {

Brokers []string `mapstructure:"brokers"`
Topic string `mapstructure:"topic"`
GroupID string `mapstructure:"group_id"`
ClientID string `mapstructure:"client_id"`
ProtocolVersion string `mapstructure:"protocol_version"`
RackID string `mapstructure:"rack_id"`
InitialOffset int64
GroupID string `mapstructure:"group_id"`
ClientID string `mapstructure:"client_id"`
ProtocolVersion string `mapstructure:"protocol_version"`
RackID string `mapstructure:"rack_id"`
}

// NewConsumer creates a new kafka consumer
Expand All @@ -71,5 +72,8 @@ func (c *Configuration) NewConsumer(logger *zap.Logger) (Consumer, error) {
// that does not set saramaConfig.Consumer.Offsets.CommitInterval to its default value 1s.
// then the samara-cluster fails if the default interval is not 1s.
saramaConfig.Consumer.Offsets.CommitInterval = time.Second
if c.InitialOffset != 0 {
saramaConfig.Consumer.Offsets.Initial = c.InitialOffset
}
return cluster.NewConsumer(c.Brokers, c.GroupID, []string{c.Topic}, saramaConfig)
}
8 changes: 7 additions & 1 deletion plugin/storage/integration/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ import (
"testing"
"time"

"github.com/Shopify/sarama"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/ingester/app"
"github.com/jaegertracing/jaeger/cmd/ingester/app/builder"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/kafka/consumer"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
Expand Down Expand Up @@ -90,7 +92,11 @@ func (s *KafkaIntegrationTestSuite) initialize() error {
if err != nil {
return err
}
options := app.Options{}
options := app.Options{
Configuration: consumer.Configuration{
InitialOffset: sarama.OffsetOldest,
},
}
options.InitFromViper(v)
traceStore := memory.NewStore()
spanConsumer, err := builder.CreateConsumer(s.logger, metrics.NullFactory, traceStore, options)
Expand Down

0 comments on commit 02dc655

Please sign in to comment.