Skip to content

Commit

Permalink
Address issues raised in pull request for #2047
Browse files Browse the repository at this point in the history
1. Setting proper defaults
2. Rename cli option name
3. Change cli option type

Signed-off-by: albert chung <alchung@us.ibm.com>
Signed-off-by: albert <alchung@us.ibm.com>
  • Loading branch information
apm-opentt committed Jan 28, 2020
1 parent 04da10c commit 1de4089
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 15 deletions.
4 changes: 2 additions & 2 deletions pkg/kafka/producer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Configuration struct {
Compression sarama.CompressionCodec
CompressionLevel int
ProtocolVersion string
LingerMS int
BatchLinger time.Duration
BatchSize int
auth.AuthenticationConfig
}
Expand All @@ -47,7 +47,7 @@ func (c *Configuration) NewProducer() (sarama.AsyncProducer, error) {
saramaConfig.Producer.CompressionLevel = c.CompressionLevel
saramaConfig.Producer.Return.Successes = true
saramaConfig.Producer.Flush.Bytes = c.BatchSize
saramaConfig.Producer.Flush.Frequency = time.Duration(c.LingerMS) * time.Millisecond
saramaConfig.Producer.Flush.Frequency = c.BatchLinger
if len(c.ProtocolVersion) > 0 {
ver, err := sarama.ParseKafkaVersion(c.ProtocolVersion)
if err != nil {
Expand Down
19 changes: 10 additions & 9 deletions plugin/storage/kafka/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"log"
"strings"
"time"

"github.com/Shopify/sarama"
"github.com/spf13/viper"
Expand All @@ -43,7 +44,7 @@ const (
suffixCompression = ".compression"
suffixCompressionLevel = ".compression-level"
suffixProtocolVersion = ".protocol-version"
suffixLingerMS = ".linger-ms"
suffixLinger = ".linger"
suffixBatchSize = ".batch-size"

defaultBroker = "127.0.0.1:9092"
Expand All @@ -52,8 +53,8 @@ const (
defaultRequiredAcks = "local"
defaultCompression = "none"
defaultCompressionLevel = 0
defaultLingerMS = 10
defaultBatchSize = 1024
defaultLinger = time.Duration(0 * time.Millisecond)
defaultBatchSize = 16384
)

var (
Expand Down Expand Up @@ -145,15 +146,15 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
defaultCompressionLevel,
"(experimental) compression level to use on messages. gzip = 1-9 (default = 6), snappy = none, lz4 = 1-17 (default = 9), zstd = -131072 - 22 (default = 3)",
)
flagSet.Int(
configPrefix+suffixLingerMS,
defaultLingerMS,
"(experimental) Number of milliseconds to delay before sending records to Kafka. Higher value reduce request volume to Kafka but increase latency and the possibility of data loss in case of process restart.",
flagSet.Duration(
configPrefix+suffixLinger,
defaultLinger,
"(experimental) Number of milliseconds to delay before sending records to Kafka. Higher value reduce request to Kafka but increase latency and the possibility of data loss in case of process restart. See https://kafka.apache.org/documentation/",
)
flagSet.Int(
configPrefix+suffixBatchSize,
defaultBatchSize,
"(experimental) Number of bytes to batch before sending records to Kafka. Higher value reduce request volume to Kafka but increase latency and the possibility of data loss in case of process restart.",
"(experimental) Number of bytes to batch before sending records to Kafka. Higher value reduce request to Kafka but increase latency and the possibility of data loss in case of process restart. See https://kafka.apache.org/documentation/",
)
auth.AddFlags(configPrefix, flagSet)
}
Expand Down Expand Up @@ -186,7 +187,7 @@ func (opt *Options) InitFromViper(v *viper.Viper) {
CompressionLevel: compressionLevel,
ProtocolVersion: v.GetString(configPrefix + suffixProtocolVersion),
AuthenticationConfig: authenticationOptions,
LingerMS: v.GetInt(configPrefix + suffixLingerMS),
BatchLinger: v.GetDuration(configPrefix + suffixLinger),
BatchSize: v.GetInt(configPrefix + suffixBatchSize),
}
opt.topic = v.GetString(configPrefix + suffixTopic)
Expand Down
9 changes: 5 additions & 4 deletions plugin/storage/kafka/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package kafka

import (
"testing"
"time"

"github.com/Shopify/sarama"
"github.com/stretchr/testify/assert"
Expand All @@ -34,7 +35,7 @@ func TestOptionsWithFlags(t *testing.T) {
"--kafka.producer.required-acks=local",
"--kafka.producer.compression=gzip",
"--kafka.producer.compression-level=7",
"--kafka.producer.linger-ms=1000",
"--kafka.producer.linger=1s",
"--kafka.producer.batch-size=128000",
})
opts.InitFromViper(v)
Expand All @@ -46,7 +47,7 @@ func TestOptionsWithFlags(t *testing.T) {
assert.Equal(t, sarama.CompressionGZIP, opts.config.Compression)
assert.Equal(t, 7, opts.config.CompressionLevel)
assert.Equal(t, 128000, opts.config.BatchSize)
assert.Equal(t, 1000, opts.config.LingerMS)
assert.Equal(t, time.Duration(1*time.Second), opts.config.BatchLinger)
}

func TestFlagDefaults(t *testing.T) {
Expand All @@ -61,8 +62,8 @@ func TestFlagDefaults(t *testing.T) {
assert.Equal(t, sarama.WaitForLocal, opts.config.RequiredAcks)
assert.Equal(t, sarama.CompressionNone, opts.config.Compression)
assert.Equal(t, 0, opts.config.CompressionLevel)
assert.Equal(t, 1024, opts.config.BatchSize)
assert.Equal(t, 10, opts.config.LingerMS)
assert.Equal(t, 16384, opts.config.BatchSize)
assert.Equal(t, time.Duration(0*time.Second), opts.config.BatchLinger)
}

func TestCompressionLevelDefaults(t *testing.T) {
Expand Down

0 comments on commit 1de4089

Please sign in to comment.