-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MINOR: Introduce the KIP-500 Broker lifecycle manager #10095
Conversation
The BrokerLifecycleManager handles broker state transitions for the KIP-500 broker. This includes sending broker registrations, heartbeats, and controlled shutdown requests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. A few comments below. Also, do you have a reference to how this class is used?
/** | ||
* How long to wait for registration to succeed before failing the startup process. | ||
*/ | ||
private val initialTimeoutNs = NANOSECONDS. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Isn't TimeUnit.MILLISECONDS.toNanos(config.initialRegistrationTimeoutMs)
more readable?
* The exponential backoff to use for resending communication. | ||
*/ | ||
private val resendExponentialBackoff = | ||
new ExponentialBackoff(100, 2, config.brokerSessionTimeoutMs.toLong, 0.1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We seem to use 0.2
as jitter elsewhere. Any reason to deviate? Maybe we should have an ExponentialBackoff
static factory method that only takes the initial interval and the max interval. It seems that the other two parameters can be the same most of the time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
20% jitter is kind of a lot for the heartbeat, I think. 20% means we could be heartbeating every 2.4 s rather than 2 s.
Thinking about it more I'd be more comfortable with 2%. The point of jitter is just to avoid having a lot of heartbeats come in all at once, and I think 2% jitter is more than sufficient to accomplish that. With a very small injection of randomness each time the heartbeat times will drift apart naturally, without the need for big divergences.
import scala.jdk.CollectionConverters._ | ||
|
||
|
||
class BrokerLifecycleManager(val config: KafkaConfig, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth adding a comment explaining the thread safety aspects of this class and also its general purpose. I see a few volatile fields, but failedAttempts
is not volatile (for example).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it might be helpful to assign each var
into one of two buckets: stuff that can only be written to from the event queue thread but that can be read from any thread (must be @volatile
), and stuff that is only used from within the event queue (doesn't need to be). At a minimum put these into different commented sections, but maybe even create a single container object for the @volatile
ones:
private case class EventQueueThreadOwnedVars(@volatile var _brokerEpoch: Long = -1L,
@volatile var _state: BrokerState = BrokerState.NOT_RUNNING)
val eventQueueThreadOwnedVars = EventQueueThreadOwnedVars()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added some JavaDoc for the class as a whole explaining the overall paradigm.
I think putting all the mutable state into a separate class is a bit too extreme. I added notes about how each mutable variable should be used, however.
channelManager: BrokerToControllerChannelManager, | ||
clusterId: Uuid, | ||
advertisedListeners: ListenerCollection, | ||
supportedFeatures: util.Map[String, VersionRange]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we using a Java collection?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
supportedFeatures is a static final map which basically explains what features the software supports. So the intention is for it to be defined in common code which happens to be written in Java (not included in this PR). It seemed simpler not to translate the map into scala, although I guess it's not too difficult either way.
|
||
def state(): BrokerState = _state | ||
|
||
class BeginControlledShutdownEvent extends EventQueue.Event { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these events meant to be public?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's a good point-- these can all be private. Will fix.
@@ -1497,6 +1509,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO | |||
var brokerId: Int = getInt(KafkaConfig.BrokerIdProp) | |||
val nodeId: Int = getInt(KafkaConfig.NodeIdProp) | |||
val processRoles: Set[ProcessRole] = parseProcessRoles() | |||
val initialRegistrationTimeoutMs = getInt(KafkaConfig.InitialBrokerRegistrationTimeoutMs) | |||
val brokerHeartbeatIntervalMs = getInt(KafkaConfig.BrokerHeartbeatIntervalMsProp) | |||
val brokerSessionTimeoutMs = getInt(KafkaConfig.BrokerSessionTimeoutMsProp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's worth including the explicit type here since it's a public val
. Are these meant to be nullable? It may be better to make then Option[Int]
if so. Otherwise, we should make them Int
so that they're easier to use (they're currently being inferred as Integer
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I added the explicit type of Int
.
I don't think they need to be nullable... they have reasonable defaults as usual in KafkaConfig. And they will be ignored altogether when in ZK mode, of course.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @cmccabe, this looks nice. I like the event driven nature of this manager along with the state machine 👍
One thing I don't love is how BrokerLifecycleManager can be instantiated but some of the members are not initialized until start is called. Is there no way we can defer initialization until all the dependencies of this class are available?
// schedule our next heartbeat a little bit sooner than we usually would. | ||
// In the case where controlled shutdown completes quickly, this will | ||
// speed things up a little bit. | ||
scheduleNextCommunication(NANOSECONDS.convert(50, MILLISECONDS)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the heartbeat interval is configurable, maybe we should calculate this from the interval instead of a fixed value. Maybe something like (interval / 2) or sqrt(interval)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. I'm not sure. Even if the heartbeat interval was 1 second or 3 seconds (just for example) I don't think we'd want this interval to change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, fair enough. Maybe we can at least consolidate on one magic "short" time and make it a constant? Now we have 10ms and 50ms hard coded.
override def run(): Unit = { | ||
_highestMetadataOffsetProvider = highestMetadataOffsetProvider | ||
_channelManager = channelManager | ||
_channelManager.start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this class is owning the lifecycle for _channelManager
, should it not also instantiate it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do it this way to make unit tests easier, since we need to supply a mock channel manager at times
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we could have the responsibility for closing the channel manager lie with the external callers. Would that be better here? It would be clearer, I guess.
new ExponentialBackoff(100, 2, config.brokerSessionTimeoutMs.toLong, 0.1) | ||
|
||
/** | ||
* The number of tries we've tried to communicate. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: append " and failed" or s/tried/failed/
private var gotControlledShutdownResponse = false | ||
|
||
/** | ||
* Whether or not we this broker is registered with the controller quorum. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: s/we this/this/
override def run(): Unit = { | ||
_state match { | ||
case BrokerState.PENDING_CONTROLLED_SHUTDOWN => | ||
info(s"Attempted to enter controlled shutdown state, but we are already in " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: s/controlled/pending controlled/
/** | ||
* The broker rack, or null if there is no configured rack. | ||
*/ | ||
private val rack = config.rack.orNull |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make an Option[String]
? No need to propagate the unfortunate fact that rack can be null elsewhere?
import scala.jdk.CollectionConverters._ | ||
|
||
|
||
class BrokerLifecycleManager(val config: KafkaConfig, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it might be helpful to assign each var
into one of two buckets: stuff that can only be written to from the event queue thread but that can be read from any thread (must be @volatile
), and stuff that is only used from within the event queue (doesn't need to be). At a minimum put these into different commented sections, but maybe even create a single container object for the @volatile
ones:
private case class EventQueueThreadOwnedVars(@volatile var _brokerEpoch: Long = -1L,
@volatile var _state: BrokerState = BrokerState.NOT_RUNNING)
val eventQueueThreadOwnedVars = EventQueueThreadOwnedVars()
manager.controlledShutdownFuture.get() | ||
manager.close() | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: missing newline
Thanks for the reviews, @mumrah !
I think the next PR in the series makes things clearer. The |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation on the initialization/startup thing @cmccabe. I think this LGTM 👍
Add the KIP-500 broker lifecycle manager. It owns the broker state. Its inputs are messages passed in from other parts of the broker and from the controller: requests to start up, or shut down, for example. Its output are the broker state and various futures that can be used to wait for broker state transitions to occur. The lifecycle manager handles registering the broker with the controller, as described in KIP-631. After registration is complete, it handles sending periodic broker heartbeats and processing the responses. Reviewers: David Arthur <mumrah@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Ron Dagostino <rdagostino@confluent.io>
…e-allocations-lz4 * apache-github/trunk: (118 commits) KAFKA-12327: Remove MethodHandle usage in CompressionType (apache#10123) KAFKA-12297: Make MockProducer return RecordMetadata with values as per contract MINOR: Update zstd and use classes with no finalizers (apache#10120) KAFKA-12326: Corrected regresion in MirrorMaker 2 executable introduced with KAFKA-10021 (apache#10122) KAFKA-12321 the comparison function for uuid type should be 'equals' rather than '==' (apache#10098) MINOR: Add FetchSnapshot API doc in KafkaRaftClient (apache#10097) MINOR: KIP-631 KafkaConfig fixes and improvements (apache#10114) KAFKA-12272: Fix commit-interval metrics (apache#10102) MINOR: Improve confusing admin client shutdown logging (apache#10107) MINOR: Add BrokerMetadataListener (apache#10111) MINOR: Support Raft-based metadata quorums in system tests (apache#10093) MINOR: add the MetaLogListener, LocalLogManager, and Controller interface. (apache#10106) MINOR: Introduce the KIP-500 Broker lifecycle manager (apache#10095) MINOR: Remove always-passing validation in TestRecordTest#testProducerRecord (apache#9930) KAFKA-5235: GetOffsetShell: Support for multiple topics and consumer configuration override (KIP-635) (apache#9430) MINOR: Prevent creating partition.metadata until ID can be written (apache#10041) MINOR: Add RaftReplicaManager (apache#10069) MINOR: Add ClientQuotaMetadataManager for processing QuotaRecord (apache#10101) MINOR: Rename DecommissionBrokers to UnregisterBrokers (apache#10084) MINOR: KafkaBroker.brokerState should be volatile instead of AtomicReference (apache#10080) ... clients/src/main/java/org/apache/kafka/common/record/CompressionType.java core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
The broker lifecycle manager handles registering the broker and sending periodic heartbeats, as described in KIP-631. Based on the responses it receives from the controller, it transitions the broker through the states described in BrokerState.java: STARTING, RECOVERY, RUNNING, PENDING_CONTROLLED_SHUTDOWN, etc.