diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index e74fdb830fc..bd6da2879af 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -944,6 +944,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Move IIS module to GA and map fields. {issue}22609[22609] {pull}23024[23024] - Apache: convert status.total_kbytes to status.total_bytes in fleet mode. {pull}23022[23022] - Release MSSQL as GA {pull}23146[23146] +- Add support for SASL/SCRAM authentication to the Kafka module. {pull}24810[24810] *Packetbeat* diff --git a/libbeat/common/kafka/sasl.go b/libbeat/common/kafka/sasl.go new file mode 100644 index 00000000000..9a6b3314b8b --- /dev/null +++ b/libbeat/common/kafka/sasl.go @@ -0,0 +1,69 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package kafka + +import ( + "fmt" + "strings" + + "github.com/Shopify/sarama" +) + +type SaslConfig struct { + SaslMechanism string `config:"mechanism"` +} + +const ( + saslTypePlaintext = sarama.SASLTypePlaintext + saslTypeSCRAMSHA256 = sarama.SASLTypeSCRAMSHA256 + saslTypeSCRAMSHA512 = sarama.SASLTypeSCRAMSHA512 +) + +func (c *SaslConfig) ConfigureSarama(config *sarama.Config) { + switch strings.ToUpper(c.SaslMechanism) { // try not to force users to use all upper case + case "": + // SASL is not enabled + return + case saslTypePlaintext: + config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypePlaintext) + case saslTypeSCRAMSHA256: + config.Net.SASL.Handshake = true + config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256) + config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { + return &XDGSCRAMClient{HashGeneratorFcn: SHA256} + } + case saslTypeSCRAMSHA512: + config.Net.SASL.Handshake = true + config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512) + config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { + return &XDGSCRAMClient{HashGeneratorFcn: SHA512} + } + default: + // This should never happen because `SaslMechanism` is checked on `Validate()`, keeping a panic to detect it earlier if it happens. + panic(fmt.Sprintf("not valid SASL mechanism '%v', only supported with PLAIN|SCRAM-SHA-512|SCRAM-SHA-256", c.SaslMechanism)) + } +} + +func (c *SaslConfig) Validate() error { + switch strings.ToUpper(c.SaslMechanism) { // try not to force users to use all upper case + case "", saslTypePlaintext, saslTypeSCRAMSHA256, saslTypeSCRAMSHA512: + default: + return fmt.Errorf("not valid SASL mechanism '%v', only supported with PLAIN|SCRAM-SHA-512|SCRAM-SHA-256", c.SaslMechanism) + } + return nil +} diff --git a/libbeat/outputs/kafka/scram.go b/libbeat/common/kafka/scram.go similarity index 100% rename from libbeat/outputs/kafka/scram.go rename to libbeat/common/kafka/scram.go diff --git a/libbeat/outputs/kafka/config.go b/libbeat/outputs/kafka/config.go index 3747a2fa63c..374ed500e10 100644 --- a/libbeat/outputs/kafka/config.go +++ b/libbeat/outputs/kafka/config.go @@ -68,14 +68,10 @@ type kafkaConfig struct { Username string `config:"username"` Password string `config:"password"` Codec codec.Config `config:"codec"` - Sasl saslConfig `config:"sasl"` + Sasl kafka.SaslConfig `config:"sasl"` EnableFAST bool `config:"enable_krb5_fast"` } -type saslConfig struct { - SaslMechanism string `config:"mechanism"` -} - type metaConfig struct { Retry metaRetryConfig `config:"retry"` RefreshFreq time.Duration `config:"refresh_frequency" validate:"min=0"` @@ -140,36 +136,6 @@ func defaultConfig() kafkaConfig { } } -func (c *saslConfig) configureSarama(config *sarama.Config) error { - switch strings.ToUpper(c.SaslMechanism) { // try not to force users to use all upper case - case "": - // SASL is not enabled - return nil - case saslTypePlaintext: - config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypePlaintext) - case saslTypeSCRAMSHA256: - cfgwarn.Beta("SCRAM-SHA-256 authentication for Kafka is beta.") - - config.Net.SASL.Handshake = true - config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256) - config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { - return &XDGSCRAMClient{HashGeneratorFcn: SHA256} - } - case saslTypeSCRAMSHA512: - cfgwarn.Beta("SCRAM-SHA-512 authentication for Kafka is beta.") - - config.Net.SASL.Handshake = true - config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512) - config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { - return &XDGSCRAMClient{HashGeneratorFcn: SHA512} - } - default: - return fmt.Errorf("not valid mechanism '%v', only supported with PLAIN|SCRAM-SHA-512|SCRAM-SHA-256", c.SaslMechanism) - } - - return nil -} - func readConfig(cfg *common.Config) (*kafkaConfig, error) { c := defaultConfig() if err := cfg.Unpack(&c); err != nil { @@ -252,11 +218,7 @@ func newSaramaConfig(log *logp.Logger, config *kafkaConfig) (*sarama.Config, err k.Net.SASL.Enable = true k.Net.SASL.User = config.Username k.Net.SASL.Password = config.Password - err = config.Sasl.configureSarama(k) - - if err != nil { - return nil, err - } + config.Sasl.ConfigureSarama(k) } // configure metadata update properties diff --git a/metricbeat/docs/modules/kafka.asciidoc b/metricbeat/docs/modules/kafka.asciidoc index 44fb4b55313..17d7ee314d0 100644 --- a/metricbeat/docs/modules/kafka.asciidoc +++ b/metricbeat/docs/modules/kafka.asciidoc @@ -85,6 +85,10 @@ metricbeat.modules: #username: "" #password: "" + # SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512. + # Defaults to PLAIN when `username` and `password` are configured. + #sasl.mechanism: '' + # Metrics collected from a Kafka broker using Jolokia #- module: kafka # metricsets: diff --git a/metricbeat/metricbeat.reference.yml b/metricbeat/metricbeat.reference.yml index dee8b504d8f..93d75700a69 100644 --- a/metricbeat/metricbeat.reference.yml +++ b/metricbeat/metricbeat.reference.yml @@ -436,6 +436,10 @@ metricbeat.modules: #username: "" #password: "" + # SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512. + # Defaults to PLAIN when `username` and `password` are configured. + #sasl.mechanism: '' + # Metrics collected from a Kafka broker using Jolokia #- module: kafka # metricsets: diff --git a/metricbeat/module/kafka/_meta/config.yml b/metricbeat/module/kafka/_meta/config.yml index ac3fb92b72d..d20a957cc9f 100644 --- a/metricbeat/module/kafka/_meta/config.yml +++ b/metricbeat/module/kafka/_meta/config.yml @@ -30,6 +30,10 @@ #username: "" #password: "" + # SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512. + # Defaults to PLAIN when `username` and `password` are configured. + #sasl.mechanism: '' + # Metrics collected from a Kafka broker using Jolokia #- module: kafka # metricsets: diff --git a/metricbeat/module/kafka/broker.go b/metricbeat/module/kafka/broker.go index 2e558d7944a..11bc0ac2c5c 100644 --- a/metricbeat/module/kafka/broker.go +++ b/metricbeat/module/kafka/broker.go @@ -60,6 +60,7 @@ type BrokerSettings struct { TLS *tls.Config Username, Password string Version kafka.Version + Sasl kafka.SaslConfig } type GroupDescription struct { @@ -91,6 +92,7 @@ func NewBroker(host string, settings BrokerSettings) *Broker { cfg.Net.SASL.Enable = true cfg.Net.SASL.User = user cfg.Net.SASL.Password = settings.Password + settings.Sasl.ConfigureSarama(cfg) } cfg.Version, _ = settings.Version.Get() diff --git a/metricbeat/module/kafka/config.go b/metricbeat/module/kafka/config.go index 8d42af9982b..e3e4aa11866 100644 --- a/metricbeat/module/kafka/config.go +++ b/metricbeat/module/kafka/config.go @@ -21,6 +21,7 @@ import ( "fmt" "time" + "github.com/elastic/beats/v7/libbeat/common/kafka" "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" ) @@ -31,6 +32,7 @@ type metricsetConfig struct { Username string `config:"username"` Password string `config:"password"` ClientID string `config:"client_id"` + Sasl kafka.SaslConfig `config:"sasl"` } var defaultConfig = metricsetConfig{ diff --git a/metricbeat/module/kafka/metricset.go b/metricbeat/module/kafka/metricset.go index 5ec46332b35..ee46788f0f9 100644 --- a/metricbeat/module/kafka/metricset.go +++ b/metricbeat/module/kafka/metricset.go @@ -64,6 +64,7 @@ func NewMetricSet(base mb.BaseMetricSet, options MetricSetOptions) (*MetricSet, Username: config.Username, Password: config.Password, Version: Version(options.Version), + Sasl: config.Sasl, } return &MetricSet{ diff --git a/metricbeat/modules.d/kafka.yml.disabled b/metricbeat/modules.d/kafka.yml.disabled index 089e722028c..1e0db5d517b 100644 --- a/metricbeat/modules.d/kafka.yml.disabled +++ b/metricbeat/modules.d/kafka.yml.disabled @@ -33,6 +33,10 @@ #username: "" #password: "" + # SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512. + # Defaults to PLAIN when `username` and `password` are configured. + #sasl.mechanism: '' + # Metrics collected from a Kafka broker using Jolokia #- module: kafka # metricsets: diff --git a/x-pack/metricbeat/metricbeat.reference.yml b/x-pack/metricbeat/metricbeat.reference.yml index aa4a071c916..4b4d7f76e48 100644 --- a/x-pack/metricbeat/metricbeat.reference.yml +++ b/x-pack/metricbeat/metricbeat.reference.yml @@ -787,6 +787,10 @@ metricbeat.modules: #username: "" #password: "" + # SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512. + # Defaults to PLAIN when `username` and `password` are configured. + #sasl.mechanism: '' + # Metrics collected from a Kafka broker using Jolokia #- module: kafka # metricsets: