Skip to content

Commit

Permalink
Add support for SCRAM authentication in kafka metricbeat module (#24810
Browse files Browse the repository at this point in the history
…) (#24889)

(cherry picked from commit 66829ff)

Co-authored-by: Ferdinand Holzer <fholzer@users.noreply.github.com>
  • Loading branch information
jsoriano and fholzer authored Apr 1, 2021
1 parent f2d4c16 commit 1bc0b43
Show file tree
Hide file tree
Showing 12 changed files with 97 additions and 40 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Check fields are documented in aws metricsets. {pull}23887[23887]
- Add support for defining metrics_filters for prometheus module in hints. {pull}24264[24264]
- Add support for PostgreSQL 10, 11, 12 and 13. {pull}24402[24402]
- Add support for SASL/SCRAM authentication to the Kafka module. {pull}24810[24810]

*Packetbeat*

Expand Down
69 changes: 69 additions & 0 deletions libbeat/common/kafka/sasl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package kafka

import (
"fmt"
"strings"

"github.com/Shopify/sarama"
)

type SaslConfig struct {
SaslMechanism string `config:"mechanism"`
}

const (
saslTypePlaintext = sarama.SASLTypePlaintext
saslTypeSCRAMSHA256 = sarama.SASLTypeSCRAMSHA256
saslTypeSCRAMSHA512 = sarama.SASLTypeSCRAMSHA512
)

func (c *SaslConfig) ConfigureSarama(config *sarama.Config) {
switch strings.ToUpper(c.SaslMechanism) { // try not to force users to use all upper case
case "":
// SASL is not enabled
return
case saslTypePlaintext:
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypePlaintext)
case saslTypeSCRAMSHA256:
config.Net.SASL.Handshake = true
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256)
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA256}
}
case saslTypeSCRAMSHA512:
config.Net.SASL.Handshake = true
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
}
default:
// This should never happen because `SaslMechanism` is checked on `Validate()`, keeping a panic to detect it earlier if it happens.
panic(fmt.Sprintf("not valid SASL mechanism '%v', only supported with PLAIN|SCRAM-SHA-512|SCRAM-SHA-256", c.SaslMechanism))
}
}

func (c *SaslConfig) Validate() error {
switch strings.ToUpper(c.SaslMechanism) { // try not to force users to use all upper case
case "", saslTypePlaintext, saslTypeSCRAMSHA256, saslTypeSCRAMSHA512:
default:
return fmt.Errorf("not valid SASL mechanism '%v', only supported with PLAIN|SCRAM-SHA-512|SCRAM-SHA-256", c.SaslMechanism)
}
return nil
}
File renamed without changes.
42 changes: 2 additions & 40 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,10 @@ type kafkaConfig struct {
Username string `config:"username"`
Password string `config:"password"`
Codec codec.Config `config:"codec"`
Sasl saslConfig `config:"sasl"`
Sasl kafka.SaslConfig `config:"sasl"`
EnableFAST bool `config:"enable_krb5_fast"`
}

type saslConfig struct {
SaslMechanism string `config:"mechanism"`
}

type metaConfig struct {
Retry metaRetryConfig `config:"retry"`
RefreshFreq time.Duration `config:"refresh_frequency" validate:"min=0"`
Expand Down Expand Up @@ -137,36 +133,6 @@ func defaultConfig() kafkaConfig {
}
}

func (c *saslConfig) configureSarama(config *sarama.Config) error {
switch strings.ToUpper(c.SaslMechanism) { // try not to force users to use all upper case
case "":
// SASL is not enabled
return nil
case saslTypePlaintext:
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypePlaintext)
case saslTypeSCRAMSHA256:
cfgwarn.Beta("SCRAM-SHA-256 authentication for Kafka is beta.")

config.Net.SASL.Handshake = true
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256)
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA256}
}
case saslTypeSCRAMSHA512:
cfgwarn.Beta("SCRAM-SHA-512 authentication for Kafka is beta.")

config.Net.SASL.Handshake = true
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
}
default:
return fmt.Errorf("not valid mechanism '%v', only supported with PLAIN|SCRAM-SHA-512|SCRAM-SHA-256", c.SaslMechanism)
}

return nil
}

func readConfig(cfg *common.Config) (*kafkaConfig, error) {
c := defaultConfig()
if err := cfg.Unpack(&c); err != nil {
Expand Down Expand Up @@ -249,11 +215,7 @@ func newSaramaConfig(log *logp.Logger, config *kafkaConfig) (*sarama.Config, err
k.Net.SASL.Enable = true
k.Net.SASL.User = config.Username
k.Net.SASL.Password = config.Password
err = config.Sasl.configureSarama(k)

if err != nil {
return nil, err
}
config.Sasl.ConfigureSarama(k)
}

// configure metadata update properties
Expand Down
4 changes: 4 additions & 0 deletions metricbeat/docs/modules/kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ metricbeat.modules:
#username: ""
#password: ""
# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
# Defaults to PLAIN when `username` and `password` are configured.
#sasl.mechanism: ''
# Metrics collected from a Kafka broker using Jolokia
#- module: kafka
# metricsets:
Expand Down
4 changes: 4 additions & 0 deletions metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,10 @@ metricbeat.modules:
#username: ""
#password: ""

# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
# Defaults to PLAIN when `username` and `password` are configured.
#sasl.mechanism: ''

# Metrics collected from a Kafka broker using Jolokia
#- module: kafka
# metricsets:
Expand Down
4 changes: 4 additions & 0 deletions metricbeat/module/kafka/_meta/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
#username: ""
#password: ""

# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
# Defaults to PLAIN when `username` and `password` are configured.
#sasl.mechanism: ''

# Metrics collected from a Kafka broker using Jolokia
#- module: kafka
# metricsets:
Expand Down
2 changes: 2 additions & 0 deletions metricbeat/module/kafka/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type BrokerSettings struct {
TLS *tls.Config
Username, Password string
Version kafka.Version
Sasl kafka.SaslConfig
}

type GroupDescription struct {
Expand Down Expand Up @@ -91,6 +92,7 @@ func NewBroker(host string, settings BrokerSettings) *Broker {
cfg.Net.SASL.Enable = true
cfg.Net.SASL.User = user
cfg.Net.SASL.Password = settings.Password
settings.Sasl.ConfigureSarama(cfg)
}
cfg.Version, _ = settings.Version.Get()

Expand Down
2 changes: 2 additions & 0 deletions metricbeat/module/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"time"

"github.com/elastic/beats/v7/libbeat/common/kafka"
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
)

Expand All @@ -31,6 +32,7 @@ type metricsetConfig struct {
Username string `config:"username"`
Password string `config:"password"`
ClientID string `config:"client_id"`
Sasl kafka.SaslConfig `config:"sasl"`
}

var defaultConfig = metricsetConfig{
Expand Down
1 change: 1 addition & 0 deletions metricbeat/module/kafka/metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func NewMetricSet(base mb.BaseMetricSet, options MetricSetOptions) (*MetricSet,
Username: config.Username,
Password: config.Password,
Version: Version(options.Version),
Sasl: config.Sasl,
}

return &MetricSet{
Expand Down
4 changes: 4 additions & 0 deletions metricbeat/modules.d/kafka.yml.disabled
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
#username: ""
#password: ""

# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
# Defaults to PLAIN when `username` and `password` are configured.
#sasl.mechanism: ''

# Metrics collected from a Kafka broker using Jolokia
#- module: kafka
# metricsets:
Expand Down
4 changes: 4 additions & 0 deletions x-pack/metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,10 @@ metricbeat.modules:
#username: ""
#password: ""

# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
# Defaults to PLAIN when `username` and `password` are configured.
#sasl.mechanism: ''

# Metrics collected from a Kafka broker using Jolokia
#- module: kafka
# metricsets:
Expand Down

0 comments on commit 1bc0b43

Please sign in to comment.