Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Introduce the KIP-500 Broker lifecycle manager #10095

Merged
merged 7 commits into from
Feb 11, 2021

Conversation

cmccabe
Copy link
Contributor

@cmccabe cmccabe commented Feb 10, 2021

The broker lifecycle manager handles registering the broker and sending periodic heartbeats, as described in KIP-631. Based on the responses it receives from the controller, it transitions the broker through the states described in BrokerState.java: STARTING, RECOVERY, RUNNING, PENDING_CONTROLLED_SHUTDOWN, etc.

The BrokerLifecycleManager handles broker state transitions for the
KIP-500 broker.  This includes sending broker registrations, heartbeats,
and controlled shutdown requests.
@cmccabe cmccabe added the kraft label Feb 10, 2021
Copy link
Contributor

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. A few comments below. Also, do you have a reference to how this class is used?

/**
* How long to wait for registration to succeed before failing the startup process.
*/
private val initialTimeoutNs = NANOSECONDS.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Isn't TimeUnit.MILLISECONDS.toNanos(config.initialRegistrationTimeoutMs) more readable?

* The exponential backoff to use for resending communication.
*/
private val resendExponentialBackoff =
new ExponentialBackoff(100, 2, config.brokerSessionTimeoutMs.toLong, 0.1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We seem to use 0.2 as jitter elsewhere. Any reason to deviate? Maybe we should have an ExponentialBackoff static factory method that only takes the initial interval and the max interval. It seems that the other two parameters can be the same most of the time.

Copy link
Contributor Author

@cmccabe cmccabe Feb 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

20% jitter is kind of a lot for the heartbeat, I think. 20% means we could be heartbeating every 2.4 s rather than 2 s.

Thinking about it more I'd be more comfortable with 2%. The point of jitter is just to avoid having a lot of heartbeats come in all at once, and I think 2% jitter is more than sufficient to accomplish that. With a very small injection of randomness each time the heartbeat times will drift apart naturally, without the need for big divergences.

import scala.jdk.CollectionConverters._


class BrokerLifecycleManager(val config: KafkaConfig,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth adding a comment explaining the thread safety aspects of this class and also its general purpose. I see a few volatile fields, but failedAttempts is not volatile (for example).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it might be helpful to assign each var into one of two buckets: stuff that can only be written to from the event queue thread but that can be read from any thread (must be @volatile), and stuff that is only used from within the event queue (doesn't need to be). At a minimum put these into different commented sections, but maybe even create a single container object for the @volatile ones:

  private case class EventQueueThreadOwnedVars(@volatile var _brokerEpoch: Long = -1L,
                                               @volatile var _state: BrokerState = BrokerState.NOT_RUNNING)
  val eventQueueThreadOwnedVars = EventQueueThreadOwnedVars()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some JavaDoc for the class as a whole explaining the overall paradigm.

I think putting all the mutable state into a separate class is a bit too extreme. I added notes about how each mutable variable should be used, however.

channelManager: BrokerToControllerChannelManager,
clusterId: Uuid,
advertisedListeners: ListenerCollection,
supportedFeatures: util.Map[String, VersionRange]): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we using a Java collection?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

supportedFeatures is a static final map which basically explains what features the software supports. So the intention is for it to be defined in common code which happens to be written in Java (not included in this PR). It seemed simpler not to translate the map into scala, although I guess it's not too difficult either way.


def state(): BrokerState = _state

class BeginControlledShutdownEvent extends EventQueue.Event {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these events meant to be public?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a good point-- these can all be private. Will fix.

@@ -1497,6 +1509,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
var brokerId: Int = getInt(KafkaConfig.BrokerIdProp)
val nodeId: Int = getInt(KafkaConfig.NodeIdProp)
val processRoles: Set[ProcessRole] = parseProcessRoles()
val initialRegistrationTimeoutMs = getInt(KafkaConfig.InitialBrokerRegistrationTimeoutMs)
val brokerHeartbeatIntervalMs = getInt(KafkaConfig.BrokerHeartbeatIntervalMsProp)
val brokerSessionTimeoutMs = getInt(KafkaConfig.BrokerSessionTimeoutMsProp)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's worth including the explicit type here since it's a public val. Are these meant to be nullable? It may be better to make then Option[Int] if so. Otherwise, we should make them Int so that they're easier to use (they're currently being inferred as Integer).

Copy link
Contributor Author

@cmccabe cmccabe Feb 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I added the explicit type of Int.

I don't think they need to be nullable... they have reasonable defaults as usual in KafkaConfig. And they will be ignored altogether when in ZK mode, of course.

Copy link
Contributor

@mumrah mumrah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @cmccabe, this looks nice. I like the event driven nature of this manager along with the state machine 👍

One thing I don't love is how BrokerLifecycleManager can be instantiated but some of the members are not initialized until start is called. Is there no way we can defer initialization until all the dependencies of this class are available?

// schedule our next heartbeat a little bit sooner than we usually would.
// In the case where controlled shutdown completes quickly, this will
// speed things up a little bit.
scheduleNextCommunication(NANOSECONDS.convert(50, MILLISECONDS))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the heartbeat interval is configurable, maybe we should calculate this from the interval instead of a fixed value. Maybe something like (interval / 2) or sqrt(interval)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I'm not sure. Even if the heartbeat interval was 1 second or 3 seconds (just for example) I don't think we'd want this interval to change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, fair enough. Maybe we can at least consolidate on one magic "short" time and make it a constant? Now we have 10ms and 50ms hard coded.

override def run(): Unit = {
_highestMetadataOffsetProvider = highestMetadataOffsetProvider
_channelManager = channelManager
_channelManager.start()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this class is owning the lifecycle for _channelManager, should it not also instantiate it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do it this way to make unit tests easier, since we need to supply a mock channel manager at times

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we could have the responsibility for closing the channel manager lie with the external callers. Would that be better here? It would be clearer, I guess.

new ExponentialBackoff(100, 2, config.brokerSessionTimeoutMs.toLong, 0.1)

/**
* The number of tries we've tried to communicate.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: append " and failed" or s/tried/failed/

private var gotControlledShutdownResponse = false

/**
* Whether or not we this broker is registered with the controller quorum.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s/we this/this/

override def run(): Unit = {
_state match {
case BrokerState.PENDING_CONTROLLED_SHUTDOWN =>
info(s"Attempted to enter controlled shutdown state, but we are already in " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s/controlled/pending controlled/

Comment on lines 48 to 51
/**
* The broker rack, or null if there is no configured rack.
*/
private val rack = config.rack.orNull
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make an Option[String]? No need to propagate the unfortunate fact that rack can be null elsewhere?

import scala.jdk.CollectionConverters._


class BrokerLifecycleManager(val config: KafkaConfig,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it might be helpful to assign each var into one of two buckets: stuff that can only be written to from the event queue thread but that can be read from any thread (must be @volatile), and stuff that is only used from within the event queue (doesn't need to be). At a minimum put these into different commented sections, but maybe even create a single container object for the @volatile ones:

  private case class EventQueueThreadOwnedVars(@volatile var _brokerEpoch: Long = -1L,
                                               @volatile var _state: BrokerState = BrokerState.NOT_RUNNING)
  val eventQueueThreadOwnedVars = EventQueueThreadOwnedVars()

manager.controlledShutdownFuture.get()
manager.close()
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: missing newline

@cmccabe
Copy link
Contributor Author

cmccabe commented Feb 10, 2021

Thanks for the reviews, @mumrah !

One thing I don't love is how BrokerLifecycleManager can be instantiated but some of the members are not initialized until start is called. Is there no way we can defer initialization until all the dependencies of this class are available?

I think the next PR in the series makes things clearer. The BrokerServer uses BrokerLifecycleManager to manage the broker state at all times, not just after the broker has been started or before it has been stopped. Although there is a little bit of extra complexity in the lifecycle manager to support this usage, it makes the surrounding code much simpler because we can rely on this component to always have the state. Therefore we don't need more mutable state and if statements in BrokerServer.

Copy link
Contributor

@mumrah mumrah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation on the initialization/startup thing @cmccabe. I think this LGTM 👍

@cmccabe cmccabe merged commit 6b3a455 into apache:trunk Feb 11, 2021
cmccabe added a commit that referenced this pull request Feb 11, 2021
Add the KIP-500 broker lifecycle manager.  It owns the broker state.  Its inputs are
messages passed in from other parts of the broker and from the controller: requests to start
up, or shut down, for example. Its output are the broker state and various futures that can
be used to wait for broker state transitions to occur.

The lifecycle manager handles registering the broker with the controller, as described in
KIP-631. After registration is complete, it handles sending periodic broker heartbeats and
processing the responses.

Reviewers: David Arthur <mumrah@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Ron Dagostino <rdagostino@confluent.io>
ijuma added a commit to ijuma/kafka that referenced this pull request Feb 14, 2021
…e-allocations-lz4

* apache-github/trunk: (118 commits)
  KAFKA-12327: Remove MethodHandle usage in CompressionType (apache#10123)
KAFKA-12297: Make MockProducer return RecordMetadata with values as
per contract
  MINOR: Update zstd and use classes with no finalizers (apache#10120)
KAFKA-12326: Corrected regresion in MirrorMaker 2 executable
introduced with KAFKA-10021 (apache#10122)
KAFKA-12321 the comparison function for uuid type should be 'equals'
rather than '==' (apache#10098)
  MINOR: Add FetchSnapshot API doc in KafkaRaftClient (apache#10097)
  MINOR: KIP-631 KafkaConfig fixes and improvements (apache#10114)
  KAFKA-12272: Fix commit-interval metrics (apache#10102)
  MINOR: Improve confusing admin client shutdown logging (apache#10107)
  MINOR: Add BrokerMetadataListener (apache#10111)
  MINOR: Support Raft-based metadata quorums in system tests (apache#10093)
MINOR: add the MetaLogListener, LocalLogManager, and Controller
interface. (apache#10106)
  MINOR: Introduce the KIP-500 Broker lifecycle manager (apache#10095)
MINOR: Remove always-passing validation in
TestRecordTest#testProducerRecord (apache#9930)
KAFKA-5235: GetOffsetShell: Support for multiple topics and consumer
configuration override (KIP-635) (apache#9430)
MINOR: Prevent creating partition.metadata until ID can be written
(apache#10041)
  MINOR: Add RaftReplicaManager (apache#10069)
MINOR: Add ClientQuotaMetadataManager for processing QuotaRecord
(apache#10101)
  MINOR: Rename DecommissionBrokers to UnregisterBrokers (apache#10084)
MINOR: KafkaBroker.brokerState should be volatile instead of
AtomicReference (apache#10080)
  ...

clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants