Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: KafkaBroker.brokerState should be volatile instead of AtomicReference #10080

Merged
merged 1 commit into from
Feb 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ public Set<KafkaServer> brokersInState(Predicate<BrokerState> desiredState) {

protected boolean hasState(KafkaServer server, Predicate<BrokerState> desiredState) {
try {
return desiredState.test(server.brokerState().get());
return desiredState.test(server.brokerState());
} catch (Throwable e) {
// Broker failed to respond.
return false;
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/scala/kafka/server/KafkaBroker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package kafka.server

import java.util
import java.util.concurrent.atomic.AtomicReference

import com.yammer.metrics.core.MetricName
import kafka.log.LogManager
Expand Down Expand Up @@ -71,8 +70,11 @@ object KafkaBroker {
}

trait KafkaBroker extends KafkaMetricsGroup {
@volatile private var _brokerState: BrokerState = BrokerState.NOT_RUNNING
chia7712 marked this conversation as resolved.
Show resolved Hide resolved

def authorizer: Option[Authorizer]
val brokerState = new AtomicReference[BrokerState](BrokerState.NOT_RUNNING)
def brokerState: BrokerState = _brokerState
protected def brokerState_= (brokerState: BrokerState): Unit = _brokerState = brokerState
def clusterId: String
def config: KafkaConfig
def dataPlaneRequestHandlerPool: KafkaRequestHandlerPool
Expand All @@ -90,7 +92,7 @@ trait KafkaBroker extends KafkaMetricsGroup {
explicitMetricName(KafkaBroker.metricsPrefix, KafkaBroker.metricsTypeName, name, metricTags)
}

newGauge("BrokerState", () => brokerState.get.value())
newGauge("BrokerState", () => brokerState.value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(this is not related to this PR) it is a bit weird to me that we expose "byte" as "broker state".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. To change this, it would require a KIP.

newGauge("ClusterId", () => clusterId)
newGauge("yammer-metrics-count", () => KafkaYammerMetrics.defaultRegistry.allMetrics.size)

Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class KafkaServer(

val canStartup = isStartingUp.compareAndSet(false, true)
if (canStartup) {
brokerState.set(BrokerState.STARTING)
brokerState = BrokerState.STARTING

/* setup zookeeper */
initZkClient(time)
Expand Down Expand Up @@ -247,7 +247,7 @@ class KafkaServer(
logManager = LogManager(config, initialOfflineDirs,
new ZkConfigRepository(new AdminZkClient(zkClient)),
kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
brokerState.set(BrokerState.RECOVERY)
brokerState = BrokerState.RECOVERY
logManager.startup(zkClient.getAllTopicsInCluster())

metadataCache = MetadataCache.zkMetadataCache(config.brokerId)
Expand Down Expand Up @@ -394,7 +394,7 @@ class KafkaServer(

socketServer.startProcessingRequests(authorizerFutures)

brokerState.set(BrokerState.RUNNING)
brokerState = BrokerState.RUNNING
shutdownLatch = new CountDownLatch(1)
startupComplete.set(true)
isStartingUp.set(false)
Expand Down Expand Up @@ -632,7 +632,7 @@ class KafkaServer(
// the shutdown.
info("Starting controlled shutdown")

brokerState.set(BrokerState.PENDING_CONTROLLED_SHUTDOWN)
brokerState = BrokerState.PENDING_CONTROLLED_SHUTDOWN

val shutdownSucceeded = doControlledShutdown(config.controlledShutdownMaxRetries.intValue)

Expand All @@ -657,7 +657,7 @@ class KafkaServer(
// `true` at the end of this method.
if (shutdownLatch.getCount > 0 && isShuttingDown.compareAndSet(false, true)) {
CoreUtils.swallow(controlledShutdown(), this)
brokerState.set(BrokerState.SHUTTING_DOWN)
brokerState = BrokerState.SHUTTING_DOWN

if (dynamicConfigManager != null)
CoreUtils.swallow(dynamicConfigManager.shutdown(), this)
Expand Down Expand Up @@ -728,7 +728,7 @@ class KafkaServer(
// Clear all reconfigurable instances stored in DynamicBrokerConfig
config.dynamicConfig.clear()

brokerState.set(BrokerState.NOT_RUNNING)
brokerState = BrokerState.NOT_RUNNING

startupComplete.set(false)
isShuttingDown.set(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ abstract class BaseRequestTest extends IntegrationTestHarness {

def anySocketServer: SocketServer = {
servers.find { server =>
val state = server.brokerState.get()
val state = server.brokerState
state != BrokerState.NOT_RUNNING && state != BrokerState.SHUTTING_DOWN
}.map(_.socketServer).getOrElse(throw new IllegalStateException("No live broker is available"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ class MetadataRequestTest extends AbstractMetadataRequestTest {
@Test
def testIsrAfterBrokerShutDownAndJoinsBack(): Unit = {
def checkIsr(servers: Seq[KafkaServer], topic: String): Unit = {
val activeBrokers = servers.filter(_.brokerState.get() != BrokerState.NOT_RUNNING)
val activeBrokers = servers.filter(_.brokerState != BrokerState.NOT_RUNNING)
val expectedIsr = activeBrokers.map(_.config.brokerId).toSet

// Assert that topic metadata at new brokers is updated correctly
Expand Down Expand Up @@ -355,7 +355,7 @@ class MetadataRequestTest extends AbstractMetadataRequestTest {
val brokersInController = controllerMetadataResponse.get.brokers.asScala.toSeq.sortBy(_.id)

// Assert that metadata is propagated correctly
servers.filter(_.brokerState.get() != BrokerState.NOT_RUNNING).foreach { broker =>
servers.filter(_.brokerState != BrokerState.NOT_RUNNING).foreach { broker =>
TestUtils.waitUntilTrue(() => {
val metadataResponse = sendMetadataRequest(MetadataRequest.Builder.allTopics.build,
Some(brokerSocketServer(broker.config.brokerId)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,10 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
// goes wrong so that awaitShutdown doesn't hang
case e: Exception =>
assertTrue(exceptionClassTag.runtimeClass.isInstance(e), s"Unexpected exception $e")
assertEquals(BrokerState.NOT_RUNNING, server.brokerState.get())
assertEquals(BrokerState.NOT_RUNNING, server.brokerState)
}
finally {
if (server.brokerState.get() != BrokerState.NOT_RUNNING)
if (server.brokerState != BrokerState.NOT_RUNNING)
server.shutdown()
server.awaitShutdown()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
server = new KafkaServer(KafkaConfig.fromProps(props))

server.startup()
TestUtils.waitUntilTrue(() => server.brokerState.get() == BrokerState.RUNNING,
TestUtils.waitUntilTrue(() => server.brokerState == BrokerState.RUNNING,
"waiting for the broker state to become RUNNING")
val brokers = zkClient.getAllBrokersInCluster
assertEquals(1, brokers.size)
Expand Down