Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

supports customized kafka client id #1507

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cmd/ingester/app/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit
spanProcessor := processor.NewSpanProcessor(spParams)

consumerConfig := kafkaConsumer.Configuration{
Brokers: options.Brokers,
Topic: options.Topic,
GroupID: options.GroupID,
Brokers: options.Brokers,
Topic: options.Topic,
GroupID: options.GroupID,
ClientID: options.ClientID,
}
saramaConsumer, err := consumerConfig.NewConsumer()
if err != nil {
Expand Down
10 changes: 10 additions & 0 deletions cmd/ingester/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ const (
SuffixTopic = ".topic"
// SuffixGroupID is a suffix for the group-id flag
SuffixGroupID = ".group-id"
// SuffixClientID is a suffix for the client-id flag
SuffixClientID = ".client-id"
// SuffixEncoding is a suffix for the encoding flag
SuffixEncoding = ".encoding"
// SuffixDeadlockInterval is a suffix for deadlock detecor flag
Expand All @@ -53,6 +55,8 @@ const (
DefaultTopic = "jaeger-spans"
// DefaultGroupID is the default consumer Group ID
DefaultGroupID = "jaeger-ingester"
// DefaultClientID is the default consumer Client ID
DefaultClientID = "jaeger-ingester"
// DefaultParallelism is the default parallelism for the span processor
DefaultParallelism = 1000
// DefaultEncoding is the default span encoding
Expand All @@ -64,6 +68,7 @@ const (
// Options stores the configuration options for the Ingester
type Options struct {
kafkaConsumer.Configuration
ClientID string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't notice this earlier - but we should move this to kafkaConsumer.Configuration which is where GroupID/Topic/etc are configured.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it is redundant. it is removed.

Parallelism int
Encoding string
DeadlockInterval time.Duration
Expand All @@ -83,6 +88,10 @@ func AddFlags(flagSet *flag.FlagSet) {
KafkaConsumerConfigPrefix+SuffixGroupID,
DefaultGroupID,
"The Consumer Group that ingester will be consuming on behalf of")
flagSet.String(
KafkaConsumerConfigPrefix+SuffixClientID,
DefaultClientID,
"The Consumer Client ID that ingester will use")
flagSet.String(
KafkaConsumerConfigPrefix+SuffixEncoding,
DefaultEncoding,
Expand All @@ -102,6 +111,7 @@ func (o *Options) InitFromViper(v *viper.Viper) {
o.Brokers = strings.Split(stripWhiteSpace(v.GetString(KafkaConsumerConfigPrefix+SuffixBrokers)), ",")
o.Topic = v.GetString(KafkaConsumerConfigPrefix + SuffixTopic)
o.GroupID = v.GetString(KafkaConsumerConfigPrefix + SuffixGroupID)
o.ClientID = v.GetString(KafkaConsumerConfigPrefix + SuffixClientID)
o.Encoding = v.GetString(KafkaConsumerConfigPrefix + SuffixEncoding)

o.Parallelism = v.GetInt(ConfigPrefix + SuffixParallelism)
Expand Down
3 changes: 3 additions & 0 deletions cmd/ingester/app/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func TestOptionsWithFlags(t *testing.T) {
"--kafka.consumer.topic=topic1",
"--kafka.consumer.brokers=127.0.0.1:9092, 0.0.0:1234",
"--kafka.consumer.group-id=group1",
"--kafka.consumer.client-id=client-id1",
"--kafka.consumer.encoding=json",
"--ingester.parallelism=5",
"--ingester.deadlockInterval=2m",
Expand All @@ -40,6 +41,7 @@ func TestOptionsWithFlags(t *testing.T) {
assert.Equal(t, "topic1", o.Topic)
assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, o.Brokers)
assert.Equal(t, "group1", o.GroupID)
assert.Equal(t, "client-id1", o.ClientID)
assert.Equal(t, 5, o.Parallelism)
assert.Equal(t, 2*time.Minute, o.DeadlockInterval)
assert.Equal(t, kafka.EncodingJSON, o.Encoding)
Expand All @@ -54,6 +56,7 @@ func TestFlagDefaults(t *testing.T) {
assert.Equal(t, DefaultTopic, o.Topic)
assert.Equal(t, []string{DefaultBroker}, o.Brokers)
assert.Equal(t, DefaultGroupID, o.GroupID)
assert.Equal(t, DefaultClientID, o.ClientID)
assert.Equal(t, DefaultParallelism, o.Parallelism)
assert.Equal(t, DefaultEncoding, o.Encoding)
assert.Equal(t, DefaultDeadlockInterval, o.DeadlockInterval)
Expand Down
8 changes: 5 additions & 3 deletions pkg/kafka/consumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,17 @@ type Builder interface {

// Configuration describes the configuration properties needed to create a Kafka consumer
type Configuration struct {
Brokers []string
Topic string
GroupID string
Brokers []string
Topic string
GroupID string
ClientID string
Consumer
}

// NewConsumer creates a new kafka consumer
func (c *Configuration) NewConsumer() (Consumer, error) {
saramaConfig := cluster.NewConfig()
saramaConfig.Group.Mode = cluster.ConsumerModePartitions
saramaConfig.ClientID = c.ClientID
return cluster.NewConsumer(c.Brokers, c.GroupID, []string{c.Topic}, saramaConfig)
}
3 changes: 3 additions & 0 deletions plugin/storage/integration/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func (s *KafkaIntegrationTestSuite) initialize() error {
s.logger, _ = testutils.NewLogger()
const encoding = "json"
const groupID = "kafka-integration-test"
const ClientID = "kafka-integration-test"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this can be private

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. it's updated.

// A new topic is generated per execution to avoid data overlap
topic := "jaeger-kafka-integration-test-" + strconv.FormatInt(time.Now().UnixNano(), 10)

Expand Down Expand Up @@ -81,6 +82,8 @@ func (s *KafkaIntegrationTestSuite) initialize() error {
encoding,
"--kafka.consumer.group-id",
groupID,
"--kafka.consumer.client-id",
ClientID,
"--ingester.parallelism",
"1000",
})
Expand Down