Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for SCRAM authentication for kafka metricbeat module, fixes #19648 #24810

Merged
merged 13 commits into from
Apr 1, 2021
46 changes: 46 additions & 0 deletions libbeat/common/kafka/sasl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package kafka
fholzer marked this conversation as resolved.
Show resolved Hide resolved

import (
"fmt"
"strings"

"github.com/Shopify/sarama"
)

type SaslConfig struct {
SaslMechanism string `config:"mechanism"`
//SaslUsername string `config:"username"` //maybe use ssl.username ssl.password instead in future?
//SaslPassword string `config:"password"`
fholzer marked this conversation as resolved.
Show resolved Hide resolved
}

const (
saslTypePlaintext = sarama.SASLTypePlaintext
saslTypeSCRAMSHA256 = sarama.SASLTypeSCRAMSHA256
saslTypeSCRAMSHA512 = sarama.SASLTypeSCRAMSHA512
)

func (c *SaslConfig) ConfigureSarama(config *sarama.Config) error {
switch strings.ToUpper(c.SaslMechanism) { // try not to force users to use all upper case
case "":
// SASL is not enabled
return nil
case saslTypePlaintext:
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypePlaintext)
case saslTypeSCRAMSHA256:
config.Net.SASL.Handshake = true
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256)
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA256}
}
case saslTypeSCRAMSHA512:
config.Net.SASL.Handshake = true
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
}
default:
return fmt.Errorf("not valid mechanism '%v', only supported with PLAIN|SCRAM-SHA-512|SCRAM-SHA-256", c.SaslMechanism)
}

return nil
}
File renamed without changes.
38 changes: 2 additions & 36 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,10 @@ type kafkaConfig struct {
Username string `config:"username"`
Password string `config:"password"`
Codec codec.Config `config:"codec"`
Sasl saslConfig `config:"sasl"`
Sasl kafka.SaslConfig `config:"sasl"`
EnableFAST bool `config:"enable_krb5_fast"`
}

type saslConfig struct {
SaslMechanism string `config:"mechanism"`
}

type metaConfig struct {
Retry metaRetryConfig `config:"retry"`
RefreshFreq time.Duration `config:"refresh_frequency" validate:"min=0"`
Expand Down Expand Up @@ -140,36 +136,6 @@ func defaultConfig() kafkaConfig {
}
}

func (c *saslConfig) configureSarama(config *sarama.Config) error {
switch strings.ToUpper(c.SaslMechanism) { // try not to force users to use all upper case
case "":
// SASL is not enabled
return nil
case saslTypePlaintext:
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypePlaintext)
case saslTypeSCRAMSHA256:
cfgwarn.Beta("SCRAM-SHA-256 authentication for Kafka is beta.")

config.Net.SASL.Handshake = true
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256)
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA256}
}
case saslTypeSCRAMSHA512:
cfgwarn.Beta("SCRAM-SHA-512 authentication for Kafka is beta.")

config.Net.SASL.Handshake = true
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
}
default:
return fmt.Errorf("not valid mechanism '%v', only supported with PLAIN|SCRAM-SHA-512|SCRAM-SHA-256", c.SaslMechanism)
}

return nil
}

func readConfig(cfg *common.Config) (*kafkaConfig, error) {
c := defaultConfig()
if err := cfg.Unpack(&c); err != nil {
Expand Down Expand Up @@ -252,7 +218,7 @@ func newSaramaConfig(log *logp.Logger, config *kafkaConfig) (*sarama.Config, err
k.Net.SASL.Enable = true
k.Net.SASL.User = config.Username
k.Net.SASL.Password = config.Password
err = config.Sasl.configureSarama(k)
err = config.Sasl.ConfigureSarama(k)

if err != nil {
return nil, err
Expand Down
4 changes: 4 additions & 0 deletions metricbeat/module/kafka/_meta/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
#username: ""
#password: ""

# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
# Defaults to PLAIN when `username` and `password` are configured.
#sasl.mechanism: ''

# Metrics collected from a Kafka broker using Jolokia
#- module: kafka
# metricsets:
Expand Down
12 changes: 12 additions & 0 deletions metricbeat/module/kafka/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,15 @@ type BrokerSettings struct {
TLS *tls.Config
Username, Password string
Version kafka.Version
Sasl kafka.SaslConfig
}

//const (
// saslTypePlaintext = sarama.SASLTypePlaintext
// saslTypeSCRAMSHA256 = sarama.SASLTypeSCRAMSHA256
// saslTypeSCRAMSHA512 = sarama.SASLTypeSCRAMSHA512
//)
fholzer marked this conversation as resolved.
Show resolved Hide resolved

type GroupDescription struct {
Members map[string]MemberDescription
}
Expand Down Expand Up @@ -91,6 +98,11 @@ func NewBroker(host string, settings BrokerSettings) *Broker {
cfg.Net.SASL.Enable = true
cfg.Net.SASL.User = user
cfg.Net.SASL.Password = settings.Password
err := settings.Sasl.ConfigureSarama(cfg)

if err != nil {
return nil
}
fholzer marked this conversation as resolved.
Show resolved Hide resolved
}
cfg.Version, _ = settings.Version.Get()

Expand Down
2 changes: 2 additions & 0 deletions metricbeat/module/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/v7/libbeat/common/kafka"
)

type metricsetConfig struct {
Expand All @@ -31,6 +32,7 @@ type metricsetConfig struct {
Username string `config:"username"`
Password string `config:"password"`
ClientID string `config:"client_id"`
Sasl kafka.SaslConfig `config:"sasl"`
}

var defaultConfig = metricsetConfig{
Expand Down
1 change: 1 addition & 0 deletions metricbeat/module/kafka/metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func NewMetricSet(base mb.BaseMetricSet, options MetricSetOptions) (*MetricSet,
Username: config.Username,
Password: config.Password,
Version: Version(options.Version),
Sasl: config.Sasl,
}

return &MetricSet{
Expand Down