diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 0ecb48cd23cc..2fd04ae501b3 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -372,11 +372,12 @@ object KafkaConfig { /** KIP-500 Configuration */ val ProcessRolesProp = "process.roles" - val InitialBrokerRegistrationTimeoutMs = "initial.broker.registration.timeout.ms" + val InitialBrokerRegistrationTimeoutMsProp = "initial.broker.registration.timeout.ms" val BrokerHeartbeatIntervalMsProp = "broker.heartbeat.interval.ms" val BrokerSessionTimeoutMsProp = "broker.session.timeout.ms" val NodeIdProp = "node.id" val MetadataLogDirProp = "metadata.log.dir" + val ControllerListenerNamesProp = "controller.listener.names" /************* Authorizer Configuration ***********/ val AuthorizerClassNameProp = "authorizer.class.name" @@ -672,6 +673,8 @@ object KafkaConfig { "This is required configuration when the self-managed quorum is enabled." val MetadataLogDirDoc = "This configuration determines where we put the metadata log for clusters upgraded to " + "KIP-500. If it is not set, the metadata log is placed in the first log directory from log.dirs." + val ControllerListenerNamesDoc = "A comma-separated list of the names of the listeners used by the KIP-500 controller. This is required " + + "if this process is a KIP-500 controller. The ZK-based controller will not use this configuration." /************* Authorizer Configuration ***********/ val AuthorizerClassNameDoc = s"The fully qualified name of a class that implements s${classOf[Authorizer].getName}" + @@ -1073,10 +1076,11 @@ object KafkaConfig { */ .defineInternal(ProcessRolesProp, LIST, Collections.emptyList(), ValidList.in("broker", "controller"), HIGH, ProcessRolesDoc) .defineInternal(NodeIdProp, INT, Defaults.EmptyNodeId, null, HIGH, NodeIdDoc) - .defineInternal(InitialBrokerRegistrationTimeoutMs, INT, Defaults.InitialBrokerRegistrationTimeoutMs, null, MEDIUM, InitialBrokerRegistrationTimeoutMsDoc) + .defineInternal(InitialBrokerRegistrationTimeoutMsProp, INT, Defaults.InitialBrokerRegistrationTimeoutMs, null, MEDIUM, InitialBrokerRegistrationTimeoutMsDoc) .defineInternal(BrokerHeartbeatIntervalMsProp, INT, Defaults.BrokerHeartbeatIntervalMs, null, MEDIUM, BrokerHeartbeatIntervalMsDoc) .defineInternal(BrokerSessionTimeoutMsProp, INT, Defaults.BrokerSessionTimeoutMs, null, MEDIUM, BrokerSessionTimeoutMsDoc) .defineInternal(MetadataLogDirProp, STRING, null, null, HIGH, MetadataLogDirDoc) + .defineInternal(ControllerListenerNamesProp, STRING, null, null, HIGH, ControllerListenerNamesDoc) /************* Authorizer Configuration ***********/ .define(AuthorizerClassNameProp, STRING, Defaults.AuthorizerClassName, LOW, AuthorizerClassNameDoc) @@ -1506,10 +1510,17 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO /** ********* General Configuration ***********/ val brokerIdGenerationEnable: Boolean = getBoolean(KafkaConfig.BrokerIdGenerationEnableProp) val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp) - var brokerId: Int = getInt(KafkaConfig.BrokerIdProp) - val nodeId: Int = getInt(KafkaConfig.NodeIdProp) + var brokerId: Int = { + val nodeId = getInt(KafkaConfig.NodeIdProp) + if (nodeId < 0) { + getInt(KafkaConfig.BrokerIdProp) + } else { + nodeId + } + } + val nodeId: Int = brokerId val processRoles: Set[ProcessRole] = parseProcessRoles() - val initialRegistrationTimeoutMs: Int = getInt(KafkaConfig.InitialBrokerRegistrationTimeoutMs) + val initialRegistrationTimeoutMs: Int = getInt(KafkaConfig.InitialBrokerRegistrationTimeoutMsProp) val brokerHeartbeatIntervalMs: Int = getInt(KafkaConfig.BrokerHeartbeatIntervalMsProp) val brokerSessionTimeoutMs: Int = getInt(KafkaConfig.BrokerSessionTimeoutMsProp) @@ -1797,6 +1808,12 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO }.getOrElse(CoreUtils.listenerListToEndPoints("PLAINTEXT://" + hostName + ":" + port, listenerSecurityProtocolMap)) } + def controllerListenerNames: Seq[String] = + Option(getString(KafkaConfig.ControllerListenerNamesProp)).getOrElse("").split(",") + + def controllerListeners: Seq[EndPoint] = + listeners.filter(l => controllerListenerNames.contains(l.listenerName.value())) + def controlPlaneListener: Option[EndPoint] = { controlPlaneListenerName.map { listenerName => listeners.filter(endpoint => endpoint.listenerName.value() == listenerName.value()).head @@ -1804,9 +1821,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO } def dataPlaneListeners: Seq[EndPoint] = { - Option(getString(KafkaConfig.ControlPlaneListenerNameProp)) match { - case Some(controlPlaneListenerName) => listeners.filterNot(_.listenerName.value() == controlPlaneListenerName) - case None => listeners + listeners.filterNot { listener => + val name = listener.listenerName.value() + name.equals(getString(KafkaConfig.ControlPlaneListenerNameProp)) || + controllerListenerNames.contains(name) } } @@ -1820,7 +1838,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || getInt(KafkaConfig.AdvertisedPortProp) != null) CoreUtils.listenerListToEndPoints("PLAINTEXT://" + advertisedHostName + ":" + advertisedPort, listenerSecurityProtocolMap, requireDistinctPorts=false) else - listeners + listeners.filterNot(l => controllerListenerNames.contains(l.listenerName.value())) } private def getInterBrokerListenerNameAndSecurityProtocol: (ListenerName, SecurityProtocol) = { @@ -1896,18 +1914,25 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO val advertisedListenerNames = advertisedListeners.map(_.listenerName).toSet val listenerNames = listeners.map(_.listenerName).toSet - require(advertisedListenerNames.contains(interBrokerListenerName), - s"${KafkaConfig.InterBrokerListenerNameProp} must be a listener name defined in ${KafkaConfig.AdvertisedListenersProp}. " + - s"The valid options based on currently configured listeners are ${advertisedListenerNames.map(_.value).mkString(",")}") - require(advertisedListenerNames.subsetOf(listenerNames), - s"${KafkaConfig.AdvertisedListenersProp} listener names must be equal to or a subset of the ones defined in ${KafkaConfig.ListenersProp}. " + - s"Found ${advertisedListenerNames.map(_.value).mkString(",")}. The valid options based on the current configuration " + - s"are ${listenerNames.map(_.value).mkString(",")}" - ) + if (processRoles.isEmpty || processRoles.contains(BrokerRole)) { + require(advertisedListenerNames.contains(interBrokerListenerName), + s"${KafkaConfig.InterBrokerListenerNameProp} must be a listener name defined in ${KafkaConfig.AdvertisedListenersProp}. " + + s"The valid options based on currently configured listeners are ${advertisedListenerNames.map(_.value).mkString(",")}") + require(advertisedListenerNames.subsetOf(listenerNames), + s"${KafkaConfig.AdvertisedListenersProp} listener names must be equal to or a subset of the ones defined in ${KafkaConfig.ListenersProp}. " + + s"Found ${advertisedListenerNames.map(_.value).mkString(",")}. The valid options based on the current configuration " + + s"are ${listenerNames.map(_.value).mkString(",")}" + ) + } + require(!advertisedListeners.exists(endpoint => endpoint.host=="0.0.0.0"), s"${KafkaConfig.AdvertisedListenersProp} cannot use the nonroutable meta-address 0.0.0.0. "+ s"Use a routable IP address.") + // Ensure controller listeners are not in the advertised listeners list + require(!controllerListeners.exists(advertisedListeners.contains), + s"${KafkaConfig.AdvertisedListenersProp} cannot contain any of ${KafkaConfig.ControllerListenerNamesProp}") + // validate controller.listener.name config if (controlPlaneListenerName.isDefined) { require(advertisedListenerNames.contains(controlPlaneListenerName.get), diff --git a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala index a823ce63bed8..7544a463bed2 100644 --- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala @@ -46,7 +46,7 @@ class BrokerLifecycleManagerTest { properties.setProperty(KafkaConfig.LogDirsProp, "/tmp/foo") properties.setProperty(KafkaConfig.ProcessRolesProp, "broker") properties.setProperty(KafkaConfig.NodeIdProp, "1") - properties.setProperty(KafkaConfig.InitialBrokerRegistrationTimeoutMs, "300000") + properties.setProperty(KafkaConfig.InitialBrokerRegistrationTimeoutMsProp, "300000") properties } diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 427bc136137d..d6c456b28889 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -261,6 +261,30 @@ class KafkaConfigTest { assertFalse(interBrokerListenerName.value().equals(controlEndpoint.listenerName.value())) } + @Test + def testControllerListenerName() = { + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) + props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:0,CONTROLPLANE://localhost:4000,CONTROLLER://localhost:5000") + props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,CONTROLPLANE:SSL,CONTROLLER:SASL_SSL") + props.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:0,CONTROLPLANE://localhost:4000") + props.put(KafkaConfig.ControlPlaneListenerNameProp, "CONTROLPLANE") + props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER") + assertTrue(isValidKafkaConfig(props)) + + val serverConfig = KafkaConfig.fromProps(props) + val controlPlaneEndpoint = serverConfig.controlPlaneListener.get + assertEquals("localhost", controlPlaneEndpoint.host) + assertEquals(4000, controlPlaneEndpoint.port) + assertEquals(SecurityProtocol.SSL, controlPlaneEndpoint.securityProtocol) + + val controllerEndpoints = serverConfig.controllerListeners + assertEquals(1, controllerEndpoints.size) + val controllerEndpoint = controllerEndpoints.iterator.next() + assertEquals("localhost", controllerEndpoint.host) + assertEquals(5000, controllerEndpoint.port) + assertEquals(SecurityProtocol.SASL_SSL, controllerEndpoint.securityProtocol) + } + @Test def testBadListenerProtocol(): Unit = { val props = new Properties() @@ -619,8 +643,13 @@ class KafkaConfigTest { case KafkaConfig.ConnectionSetupTimeoutMaxMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number") // KIP-500 Configurations + case KafkaConfig.ProcessRolesProp => // ignore + case KafkaConfig.InitialBrokerRegistrationTimeoutMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.BrokerHeartbeatIntervalMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number") + case KafkaConfig.BrokerSessionTimeoutMsProp => assertPropertyInvalid(baseProperties, name, "not_a_number") case KafkaConfig.NodeIdProp => assertPropertyInvalid(baseProperties, name, "not_a_number") case KafkaConfig.MetadataLogDirProp => // ignore string + case KafkaConfig.ControllerListenerNamesProp => // ignore string case KafkaConfig.AuthorizerClassNameProp => //ignore string case KafkaConfig.CreateTopicPolicyClassNameProp => //ignore string @@ -962,6 +991,48 @@ class KafkaConfigTest { } } + def assertDistinctControllerAndAdvertisedListeners(): Unit = { + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = TestUtils.MockZkPort) + val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094" + props.put(KafkaConfig.ListenersProp, listeners) + props.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://A:9092,SSL://B:9093") + // Valid now + assertTrue(isValidKafkaConfig(props)) + + // Still valid + val controllerListeners = "SASL_SSL" + props.put(KafkaConfig.ControllerListenerNamesProp, controllerListeners) + assertTrue(isValidKafkaConfig(props)) + } + + @Test + def assertAllControllerListenerCannotBeAdvertised(): Unit = { + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = TestUtils.MockZkPort) + val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094" + props.put(KafkaConfig.ListenersProp, listeners) + props.put(KafkaConfig.AdvertisedListenersProp, listeners) + // Valid now + assertTrue(isValidKafkaConfig(props)) + + // Invalid now + props.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT,SSL,SASL_SSL") + assertFalse(isValidKafkaConfig(props)) + } + + @Test + def assertEvenOneControllerListenerCannotBeAdvertised(): Unit = { + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = TestUtils.MockZkPort) + val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094" + props.put(KafkaConfig.ListenersProp, listeners) + props.put(KafkaConfig.AdvertisedListenersProp, listeners) + // Valid now + assertTrue(isValidKafkaConfig(props)) + + // Invalid now + props.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT") + assertFalse(isValidKafkaConfig(props)) + } + @Test def testInvalidQuorumVotersConfig(): Unit = { assertInvalidQuorumVoters("1")