Skip to content

Commit

Permalink
MINOR: KIP-631 KafkaConfig fixes and improvements (#10114)
Browse files Browse the repository at this point in the history
Add the new KIP-631 configs to KafkaConfigTest to fix the test failure.

Rename InitialBrokerRegistrationTimeoutMs to
InitialBrokerRegistrationTimeoutMsProp for consistency with the other
properties.

Add ControllerListenerNamesProp as specified in KIP-631.

Give nodeId and brokerId the same value in KafkaConfig.

Reviewers: David Arthur <mumrah@gmail.com
  • Loading branch information
cmccabe committed Feb 12, 2021
1 parent 568038a commit b419d1c
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 18 deletions.
59 changes: 42 additions & 17 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -372,11 +372,12 @@ object KafkaConfig {

/** KIP-500 Configuration */
val ProcessRolesProp = "process.roles"
val InitialBrokerRegistrationTimeoutMs = "initial.broker.registration.timeout.ms"
val InitialBrokerRegistrationTimeoutMsProp = "initial.broker.registration.timeout.ms"
val BrokerHeartbeatIntervalMsProp = "broker.heartbeat.interval.ms"
val BrokerSessionTimeoutMsProp = "broker.session.timeout.ms"
val NodeIdProp = "node.id"
val MetadataLogDirProp = "metadata.log.dir"
val ControllerListenerNamesProp = "controller.listener.names"

/************* Authorizer Configuration ***********/
val AuthorizerClassNameProp = "authorizer.class.name"
Expand Down Expand Up @@ -672,6 +673,8 @@ object KafkaConfig {
"This is required configuration when the self-managed quorum is enabled."
val MetadataLogDirDoc = "This configuration determines where we put the metadata log for clusters upgraded to " +
"KIP-500. If it is not set, the metadata log is placed in the first log directory from log.dirs."
val ControllerListenerNamesDoc = "A comma-separated list of the names of the listeners used by the KIP-500 controller. This is required " +
"if this process is a KIP-500 controller. The ZK-based controller will not use this configuration."

/************* Authorizer Configuration ***********/
val AuthorizerClassNameDoc = s"The fully qualified name of a class that implements s${classOf[Authorizer].getName}" +
Expand Down Expand Up @@ -1073,10 +1076,11 @@ object KafkaConfig {
*/
.defineInternal(ProcessRolesProp, LIST, Collections.emptyList(), ValidList.in("broker", "controller"), HIGH, ProcessRolesDoc)
.defineInternal(NodeIdProp, INT, Defaults.EmptyNodeId, null, HIGH, NodeIdDoc)
.defineInternal(InitialBrokerRegistrationTimeoutMs, INT, Defaults.InitialBrokerRegistrationTimeoutMs, null, MEDIUM, InitialBrokerRegistrationTimeoutMsDoc)
.defineInternal(InitialBrokerRegistrationTimeoutMsProp, INT, Defaults.InitialBrokerRegistrationTimeoutMs, null, MEDIUM, InitialBrokerRegistrationTimeoutMsDoc)
.defineInternal(BrokerHeartbeatIntervalMsProp, INT, Defaults.BrokerHeartbeatIntervalMs, null, MEDIUM, BrokerHeartbeatIntervalMsDoc)
.defineInternal(BrokerSessionTimeoutMsProp, INT, Defaults.BrokerSessionTimeoutMs, null, MEDIUM, BrokerSessionTimeoutMsDoc)
.defineInternal(MetadataLogDirProp, STRING, null, null, HIGH, MetadataLogDirDoc)
.defineInternal(ControllerListenerNamesProp, STRING, null, null, HIGH, ControllerListenerNamesDoc)

/************* Authorizer Configuration ***********/
.define(AuthorizerClassNameProp, STRING, Defaults.AuthorizerClassName, LOW, AuthorizerClassNameDoc)
Expand Down Expand Up @@ -1506,10 +1510,17 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
/** ********* General Configuration ***********/
val brokerIdGenerationEnable: Boolean = getBoolean(KafkaConfig.BrokerIdGenerationEnableProp)
val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp)
var brokerId: Int = getInt(KafkaConfig.BrokerIdProp)
val nodeId: Int = getInt(KafkaConfig.NodeIdProp)
var brokerId: Int = {
val nodeId = getInt(KafkaConfig.NodeIdProp)
if (nodeId < 0) {
getInt(KafkaConfig.BrokerIdProp)
} else {
nodeId
}
}
val nodeId: Int = brokerId
val processRoles: Set[ProcessRole] = parseProcessRoles()
val initialRegistrationTimeoutMs: Int = getInt(KafkaConfig.InitialBrokerRegistrationTimeoutMs)
val initialRegistrationTimeoutMs: Int = getInt(KafkaConfig.InitialBrokerRegistrationTimeoutMsProp)
val brokerHeartbeatIntervalMs: Int = getInt(KafkaConfig.BrokerHeartbeatIntervalMsProp)
val brokerSessionTimeoutMs: Int = getInt(KafkaConfig.BrokerSessionTimeoutMsProp)

Expand Down Expand Up @@ -1797,16 +1808,23 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
}.getOrElse(CoreUtils.listenerListToEndPoints("PLAINTEXT://" + hostName + ":" + port, listenerSecurityProtocolMap))
}

def controllerListenerNames: Seq[String] =
Option(getString(KafkaConfig.ControllerListenerNamesProp)).getOrElse("").split(",")

def controllerListeners: Seq[EndPoint] =
listeners.filter(l => controllerListenerNames.contains(l.listenerName.value()))

def controlPlaneListener: Option[EndPoint] = {
controlPlaneListenerName.map { listenerName =>
listeners.filter(endpoint => endpoint.listenerName.value() == listenerName.value()).head
}
}

def dataPlaneListeners: Seq[EndPoint] = {
Option(getString(KafkaConfig.ControlPlaneListenerNameProp)) match {
case Some(controlPlaneListenerName) => listeners.filterNot(_.listenerName.value() == controlPlaneListenerName)
case None => listeners
listeners.filterNot { listener =>
val name = listener.listenerName.value()
name.equals(getString(KafkaConfig.ControlPlaneListenerNameProp)) ||
controllerListenerNames.contains(name)
}
}

Expand All @@ -1820,7 +1838,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || getInt(KafkaConfig.AdvertisedPortProp) != null)
CoreUtils.listenerListToEndPoints("PLAINTEXT://" + advertisedHostName + ":" + advertisedPort, listenerSecurityProtocolMap, requireDistinctPorts=false)
else
listeners
listeners.filterNot(l => controllerListenerNames.contains(l.listenerName.value()))
}

private def getInterBrokerListenerNameAndSecurityProtocol: (ListenerName, SecurityProtocol) = {
Expand Down Expand Up @@ -1896,18 +1914,25 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO

val advertisedListenerNames = advertisedListeners.map(_.listenerName).toSet
val listenerNames = listeners.map(_.listenerName).toSet
require(advertisedListenerNames.contains(interBrokerListenerName),
s"${KafkaConfig.InterBrokerListenerNameProp} must be a listener name defined in ${KafkaConfig.AdvertisedListenersProp}. " +
s"The valid options based on currently configured listeners are ${advertisedListenerNames.map(_.value).mkString(",")}")
require(advertisedListenerNames.subsetOf(listenerNames),
s"${KafkaConfig.AdvertisedListenersProp} listener names must be equal to or a subset of the ones defined in ${KafkaConfig.ListenersProp}. " +
s"Found ${advertisedListenerNames.map(_.value).mkString(",")}. The valid options based on the current configuration " +
s"are ${listenerNames.map(_.value).mkString(",")}"
)
if (processRoles.isEmpty || processRoles.contains(BrokerRole)) {
require(advertisedListenerNames.contains(interBrokerListenerName),
s"${KafkaConfig.InterBrokerListenerNameProp} must be a listener name defined in ${KafkaConfig.AdvertisedListenersProp}. " +
s"The valid options based on currently configured listeners are ${advertisedListenerNames.map(_.value).mkString(",")}")
require(advertisedListenerNames.subsetOf(listenerNames),
s"${KafkaConfig.AdvertisedListenersProp} listener names must be equal to or a subset of the ones defined in ${KafkaConfig.ListenersProp}. " +
s"Found ${advertisedListenerNames.map(_.value).mkString(",")}. The valid options based on the current configuration " +
s"are ${listenerNames.map(_.value).mkString(",")}"
)
}

require(!advertisedListeners.exists(endpoint => endpoint.host=="0.0.0.0"),
s"${KafkaConfig.AdvertisedListenersProp} cannot use the nonroutable meta-address 0.0.0.0. "+
s"Use a routable IP address.")

// Ensure controller listeners are not in the advertised listeners list
require(!controllerListeners.exists(advertisedListeners.contains),
s"${KafkaConfig.AdvertisedListenersProp} cannot contain any of ${KafkaConfig.ControllerListenerNamesProp}")

// validate controller.listener.name config
if (controlPlaneListenerName.isDefined) {
require(advertisedListenerNames.contains(controlPlaneListenerName.get),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class BrokerLifecycleManagerTest {
properties.setProperty(KafkaConfig.LogDirsProp, "/tmp/foo")
properties.setProperty(KafkaConfig.ProcessRolesProp, "broker")
properties.setProperty(KafkaConfig.NodeIdProp, "1")
properties.setProperty(KafkaConfig.InitialBrokerRegistrationTimeoutMs, "300000")
properties.setProperty(KafkaConfig.InitialBrokerRegistrationTimeoutMsProp, "300000")
properties
}

Expand Down
71 changes: 71 additions & 0 deletions core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,30 @@ class KafkaConfigTest {
assertFalse(interBrokerListenerName.value().equals(controlEndpoint.listenerName.value()))
}

@Test
def testControllerListenerName() = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:0,CONTROLPLANE://localhost:4000,CONTROLLER://localhost:5000")
props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,CONTROLPLANE:SSL,CONTROLLER:SASL_SSL")
props.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:0,CONTROLPLANE://localhost:4000")
props.put(KafkaConfig.ControlPlaneListenerNameProp, "CONTROLPLANE")
props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
assertTrue(isValidKafkaConfig(props))

val serverConfig = KafkaConfig.fromProps(props)
val controlPlaneEndpoint = serverConfig.controlPlaneListener.get
assertEquals("localhost", controlPlaneEndpoint.host)
assertEquals(4000, controlPlaneEndpoint.port)
assertEquals(SecurityProtocol.SSL, controlPlaneEndpoint.securityProtocol)

val controllerEndpoints = serverConfig.controllerListeners
assertEquals(1, controllerEndpoints.size)
val controllerEndpoint = controllerEndpoints.iterator.next()
assertEquals("localhost", controllerEndpoint.host)
assertEquals(5000, controllerEndpoint.port)
assertEquals(SecurityProtocol.SASL_SSL, controllerEndpoint.securityProtocol)
}

@Test
def testBadListenerProtocol(): Unit = {
val props = new Properties()
Expand Down Expand Up @@ -619,8 +643,13 @@ class KafkaConfigTest {
case KafkaConfig.ConnectionSetupTimeoutMaxMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")

// KIP-500 Configurations
case KafkaConfig.ProcessRolesProp => // ignore
case KafkaConfig.InitialBrokerRegistrationTimeoutMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
case KafkaConfig.BrokerHeartbeatIntervalMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
case KafkaConfig.BrokerSessionTimeoutMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
case KafkaConfig.NodeIdProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
case KafkaConfig.MetadataLogDirProp => // ignore string
case KafkaConfig.ControllerListenerNamesProp => // ignore string

case KafkaConfig.AuthorizerClassNameProp => //ignore string
case KafkaConfig.CreateTopicPolicyClassNameProp => //ignore string
Expand Down Expand Up @@ -962,6 +991,48 @@ class KafkaConfigTest {
}
}

def assertDistinctControllerAndAdvertisedListeners(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
props.put(KafkaConfig.ListenersProp, listeners)
props.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://A:9092,SSL://B:9093")
// Valid now
assertTrue(isValidKafkaConfig(props))

// Still valid
val controllerListeners = "SASL_SSL"
props.put(KafkaConfig.ControllerListenerNamesProp, controllerListeners)
assertTrue(isValidKafkaConfig(props))
}

@Test
def assertAllControllerListenerCannotBeAdvertised(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
props.put(KafkaConfig.ListenersProp, listeners)
props.put(KafkaConfig.AdvertisedListenersProp, listeners)
// Valid now
assertTrue(isValidKafkaConfig(props))

// Invalid now
props.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT,SSL,SASL_SSL")
assertFalse(isValidKafkaConfig(props))
}

@Test
def assertEvenOneControllerListenerCannotBeAdvertised(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
props.put(KafkaConfig.ListenersProp, listeners)
props.put(KafkaConfig.AdvertisedListenersProp, listeners)
// Valid now
assertTrue(isValidKafkaConfig(props))

// Invalid now
props.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
assertFalse(isValidKafkaConfig(props))
}

@Test
def testInvalidQuorumVotersConfig(): Unit = {
assertInvalidQuorumVoters("1")
Expand Down

0 comments on commit b419d1c

Please sign in to comment.