Skip to content

Commit

Permalink
MINOR: KafkaBroker.brokerState should be volatile instead of AtomicRe…
Browse files Browse the repository at this point in the history
…ference (#10080)

We don't need or use the additional functionality provided by
AtomicReference.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, David Arthur <mumrah@gmail.com>
  • Loading branch information
ijuma committed Feb 10, 2021
1 parent 19506b6 commit ad541b9
Show file tree
Hide file tree
Showing 7 changed files with 18 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ public Set<KafkaServer> brokersInState(Predicate<BrokerState> desiredState) {

protected boolean hasState(KafkaServer server, Predicate<BrokerState> desiredState) {
try {
return desiredState.test(server.brokerState().get());
return desiredState.test(server.brokerState());
} catch (Throwable e) {
// Broker failed to respond.
return false;
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/scala/kafka/server/KafkaBroker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package kafka.server

import java.util
import java.util.concurrent.atomic.AtomicReference

import com.yammer.metrics.core.MetricName
import kafka.log.LogManager
Expand Down Expand Up @@ -71,8 +70,11 @@ object KafkaBroker {
}

trait KafkaBroker extends KafkaMetricsGroup {
@volatile private var _brokerState: BrokerState = BrokerState.NOT_RUNNING

def authorizer: Option[Authorizer]
val brokerState = new AtomicReference[BrokerState](BrokerState.NOT_RUNNING)
def brokerState: BrokerState = _brokerState
protected def brokerState_= (brokerState: BrokerState): Unit = _brokerState = brokerState
def clusterId: String
def config: KafkaConfig
def dataPlaneRequestHandlerPool: KafkaRequestHandlerPool
Expand All @@ -90,7 +92,7 @@ trait KafkaBroker extends KafkaMetricsGroup {
explicitMetricName(KafkaBroker.metricsPrefix, KafkaBroker.metricsTypeName, name, metricTags)
}

newGauge("BrokerState", () => brokerState.get.value())
newGauge("BrokerState", () => brokerState.value)
newGauge("ClusterId", () => clusterId)
newGauge("yammer-metrics-count", () => KafkaYammerMetrics.defaultRegistry.allMetrics.size)

Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class KafkaServer(

val canStartup = isStartingUp.compareAndSet(false, true)
if (canStartup) {
brokerState.set(BrokerState.STARTING)
brokerState = BrokerState.STARTING

/* setup zookeeper */
initZkClient(time)
Expand Down Expand Up @@ -247,7 +247,7 @@ class KafkaServer(
logManager = LogManager(config, initialOfflineDirs,
new ZkConfigRepository(new AdminZkClient(zkClient)),
kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
brokerState.set(BrokerState.RECOVERY)
brokerState = BrokerState.RECOVERY
logManager.startup(zkClient.getAllTopicsInCluster())

metadataCache = MetadataCache.zkMetadataCache(config.brokerId)
Expand Down Expand Up @@ -394,7 +394,7 @@ class KafkaServer(

socketServer.startProcessingRequests(authorizerFutures)

brokerState.set(BrokerState.RUNNING)
brokerState = BrokerState.RUNNING
shutdownLatch = new CountDownLatch(1)
startupComplete.set(true)
isStartingUp.set(false)
Expand Down Expand Up @@ -632,7 +632,7 @@ class KafkaServer(
// the shutdown.
info("Starting controlled shutdown")

brokerState.set(BrokerState.PENDING_CONTROLLED_SHUTDOWN)
brokerState = BrokerState.PENDING_CONTROLLED_SHUTDOWN

val shutdownSucceeded = doControlledShutdown(config.controlledShutdownMaxRetries.intValue)

Expand All @@ -657,7 +657,7 @@ class KafkaServer(
// `true` at the end of this method.
if (shutdownLatch.getCount > 0 && isShuttingDown.compareAndSet(false, true)) {
CoreUtils.swallow(controlledShutdown(), this)
brokerState.set(BrokerState.SHUTTING_DOWN)
brokerState = BrokerState.SHUTTING_DOWN

if (dynamicConfigManager != null)
CoreUtils.swallow(dynamicConfigManager.shutdown(), this)
Expand Down Expand Up @@ -728,7 +728,7 @@ class KafkaServer(
// Clear all reconfigurable instances stored in DynamicBrokerConfig
config.dynamicConfig.clear()

brokerState.set(BrokerState.NOT_RUNNING)
brokerState = BrokerState.NOT_RUNNING

startupComplete.set(false)
isShuttingDown.set(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ abstract class BaseRequestTest extends IntegrationTestHarness {

def anySocketServer: SocketServer = {
servers.find { server =>
val state = server.brokerState.get()
val state = server.brokerState
state != BrokerState.NOT_RUNNING && state != BrokerState.SHUTTING_DOWN
}.map(_.socketServer).getOrElse(throw new IllegalStateException("No live broker is available"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ class MetadataRequestTest extends AbstractMetadataRequestTest {
@Test
def testIsrAfterBrokerShutDownAndJoinsBack(): Unit = {
def checkIsr(servers: Seq[KafkaServer], topic: String): Unit = {
val activeBrokers = servers.filter(_.brokerState.get() != BrokerState.NOT_RUNNING)
val activeBrokers = servers.filter(_.brokerState != BrokerState.NOT_RUNNING)
val expectedIsr = activeBrokers.map(_.config.brokerId).toSet

// Assert that topic metadata at new brokers is updated correctly
Expand Down Expand Up @@ -355,7 +355,7 @@ class MetadataRequestTest extends AbstractMetadataRequestTest {
val brokersInController = controllerMetadataResponse.get.brokers.asScala.toSeq.sortBy(_.id)

// Assert that metadata is propagated correctly
servers.filter(_.brokerState.get() != BrokerState.NOT_RUNNING).foreach { broker =>
servers.filter(_.brokerState != BrokerState.NOT_RUNNING).foreach { broker =>
TestUtils.waitUntilTrue(() => {
val metadataResponse = sendMetadataRequest(MetadataRequest.Builder.allTopics.build,
Some(brokerSocketServer(broker.config.brokerId)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,10 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
// goes wrong so that awaitShutdown doesn't hang
case e: Exception =>
assertTrue(exceptionClassTag.runtimeClass.isInstance(e), s"Unexpected exception $e")
assertEquals(BrokerState.NOT_RUNNING, server.brokerState.get())
assertEquals(BrokerState.NOT_RUNNING, server.brokerState)
}
finally {
if (server.brokerState.get() != BrokerState.NOT_RUNNING)
if (server.brokerState != BrokerState.NOT_RUNNING)
server.shutdown()
server.awaitShutdown()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
server = new KafkaServer(KafkaConfig.fromProps(props))

server.startup()
TestUtils.waitUntilTrue(() => server.brokerState.get() == BrokerState.RUNNING,
TestUtils.waitUntilTrue(() => server.brokerState == BrokerState.RUNNING,
"waiting for the broker state to become RUNNING")
val brokers = zkClient.getAllBrokersInCluster
assertEquals(1, brokers.size)
Expand Down

0 comments on commit ad541b9

Please sign in to comment.