Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49411][SS] Communicate CheckpointID between driver and stateful operators #47895

Open
wants to merge 11 commits into
base: master
Choose a base branch
from

Conversation

siying
Copy link
Contributor

@siying siying commented Aug 27, 2024

What changes were proposed in this pull request?

This is an incremental step to implement RocksDB state store checkpoint format V2.

Once conf STATE_STORE_CHECKPOINT_FORMAT_VERSION is set to be higher than version 2, the executor returns checkpointID to the driver (only done for RocksDB). The driver stores is locally. For the next batch, the checkpointID is sent to the executor to be used to load the state store. If the local version of the executor doesn't match the uniqueID, it will reload from the checkpoint.

There is no behavior change if the default checkpoint format is used.

Why are the changes needed?

This is an incremental step of the project of a new RocksDB State Store checkpoint format. The new format is to simplify checkpoint mechanism to make it less bug prone, and fix some unexpected query results in rare queries.

Does this PR introduce any user-facing change?

No

How was this patch tested?

A new unit test is added to cover format version. And another unit test is added to validate the uniqueID is passed back and force as expected.

Was this patch authored or co-authored using generative AI tooling?

No

@siying siying marked this pull request as draft August 27, 2024 18:33
@siying siying changed the title [WIP] Communicate CheckpointID between driver and stateful operators [SPARK-49411][SS] Communicate CheckpointID between driver and stateful operators Aug 27, 2024
@siying siying marked this pull request as ready for review August 27, 2024 22:22
val isFirstBatch: Boolean)
val isFirstBatch: Boolean,
val currentCheckpointUniqueId:
MutableMap[Long, Array[String]] = MutableMap[Long, Array[String]]())
Copy link
Contributor

@WweiL WweiL Aug 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add comments on what are these unique Ids map to? I believe key is operator Id?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also better name it currentStateUniqueId as it is only related to state store not general checkpoint

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also confused by this. When I sketched an implementation of your proposal in my head, my assumption would be that IncrementalExecution would get just an ID, perhaps a single Long, that would correspond to the ID that it would bake into the physical plan sent to executors. So why is a map needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a comment, but it is basically operatorID->partitionID->checkpointID

private def updateCheckpointId(
execCtx: MicroBatchExecutionContext,
latestExecPlan: SparkPlan): Unit = {
// This function cannot handle MBP now.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary comment

if (loadedVersion != version) {
if (loadedVersion != version ||
(checkpointFormatVersion >= 2 && checkpointUniqueId.isDefined &&
(!loadedCheckpointId.isDefined || checkpointUniqueId.get != loadedCheckpointId.get))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: loadedCheckpointId.isEmpty

.agg(count("*"))
.as[(Int, Long)]

// Run the stream with changelog checkpointing disabled.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo?

// Store checkpointIDs for state store checkpoints to be committed or have been committed to
// the commit log.
// operatorID -> (partitionID -> uniqueID)
private val currentCheckpointUniqueId = MutableMap[Long, Array[String]]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this is better to be put into the stream execution context

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

operatorID -> (partitionID -> uniqueID), is this supposed to mean a map of maps? If so, then why is the type of currentCheckpointUniqueId just a single map?

I also don't fully understand why we would need a unique map for every operator X partition. Why is it not sufficient to have the following protocol, where we have one unique ID for every batch:

For the first batch, an ID is created and sent to all executors. When all tasks finish, that ID is persisted into the commit log. It is also kept in memory for the subsequent batch.

For any other batch, if there does not exist an ID in memory from the previous batch, then it must be read from the commit log and brought into memory. (This is the restart case.)

Then, using the ID in memory from the previous batch (call that prevId), this is sent to all executors in the physical plan, as well as a new ID for the current batch (call this currId). Before any processing start, executors must load and use the state for prevId to process the current batch. Then, they can start processing, and they upload their state as <state file name>_currId.<changelog|snapshot>.

What's wrong with that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now, the uniqueID is generated in executor. As a potential optimization, the driver can send a uniqueID to all executors, but executors still need to modify it to make it unique among all attempts of the same task. After doing that, the IDs won't be unique anymore, so we need different IDs per partition.

try {
if (version < 0) {
throw QueryExecutionErrors.unexpectedStateStoreVersion(version)
}
rocksDB.load(version, true)
rocksDB.load(version, uniqueId, true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rocksDB.load(
version,
if (storeConf.stateStoreCheckpointFormatVersion >= 2) uniqueId else None)

@volatile private var LastCommitBasedCheckpointId: Option[String] = None
@volatile private var lastCommittedCheckpointId: Option[String] = None
@volatile private var loadedCheckpointId: Option[String] = None
@volatile private var sessionCheckpointId: Option[String] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should reset these to None in rollback()

@neilramaswamy
Copy link
Contributor

neilramaswamy commented Sep 10, 2024

fix some unexpected query results in rare queries

@siying can you provide some content about which situations there are specifically?

(Edit, seems to be here in the design doc.)

Copy link
Contributor

@neilramaswamy neilramaswamy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going to stop reviewing since I have a few fundamental questions regarding the protocol.

@@ -105,7 +105,7 @@ class StreamStreamJoinStatePartitionReader(
val stateInfo = StatefulOperatorStateInfo(
partition.sourceOptions.stateCheckpointLocation.toString,
partition.queryId, partition.sourceOptions.operatorId,
partition.sourceOptions.batchId + 1, -1)
partition.sourceOptions.batchId + 1, -1, None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this None? I would image that users of the state data source reader now have to specify the id that they would like to read, given that state stores are now not uniquely identified by operator/partition/name, but by id/operator/partition/name?

val isFirstBatch: Boolean)
val isFirstBatch: Boolean,
val currentCheckpointUniqueId:
MutableMap[Long, Array[String]] = MutableMap[Long, Array[String]]())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also confused by this. When I sketched an implementation of your proposal in my head, my assumption would be that IncrementalExecution would get just an ID, perhaps a single Long, that would correspond to the ID that it would bake into the physical plan sent to executors. So why is a map needed?

// Store checkpointIDs for state store checkpoints to be committed or have been committed to
// the commit log.
// operatorID -> (partitionID -> uniqueID)
private val currentCheckpointUniqueId = MutableMap[Long, Array[String]]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

operatorID -> (partitionID -> uniqueID), is this supposed to mean a map of maps? If so, then why is the type of currentCheckpointUniqueId just a single map?

I also don't fully understand why we would need a unique map for every operator X partition. Why is it not sufficient to have the following protocol, where we have one unique ID for every batch:

For the first batch, an ID is created and sent to all executors. When all tasks finish, that ID is persisted into the commit log. It is also kept in memory for the subsequent batch.

For any other batch, if there does not exist an ID in memory from the previous batch, then it must be read from the commit log and brought into memory. (This is the restart case.)

Then, using the ID in memory from the previous batch (call that prevId), this is sent to all executors in the physical plan, as well as a new ID for the current batch (call this currId). Before any processing start, executors must load and use the state for prevId to process the current batch. Then, they can start processing, and they upload their state as <state file name>_currId.<changelog|snapshot>.

What's wrong with that?

Comment on lines 134 to 144
val ret = StatefulOperatorStateInfo(
checkpointLocation,
runId,
statefulOperatorId.getAndIncrement(),
operatorId,
currentBatchId,
numStateStores)
numStateStores,
currentCheckpointUniqueId.get(operatorId))
ret
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ret is not needed

case e: StreamingDeduplicateWithinWatermarkExec =>
assert(e.stateInfo.isDefined)
updateCheckpointIdForOperator(execCtx, e.stateInfo.get.operatorId, e.getCheckpointInfo())
// TODO Need to deal with FlatMapGroupsWithStateExec, TransformWithStateExec,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not?

And I also don't see why we need to enumerate all of these here. Can we leverage the StatefulOperator trait and use that to get the state info? It should clean this up quite a bit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You will, though, probably have to do some work to make sure that getCheckpointInfo can be called for any stateful operator.

watermarkTracker.updateWatermark(execCtx.executionPlan.executedPlan)
val latestExecPlan = execCtx.executionPlan.executedPlan
watermarkTracker.updateWatermark(latestExecPlan)
if (sparkSession.sessionState.conf.stateStoreCheckpointFormatVersion >= 2) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really like the >= 2 sprinkled everywhere. Can you define a constant somewhere, and then have a utility method that you call

val isFirstBatch: Boolean)
val isFirstBatch: Boolean,
val currentCheckpointUniqueId:
MutableMap[Long, Array[String]] = MutableMap[Long, Array[String]]())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it always true that partition IDs are always [0, numPartitions)?

})
}

private def updateCheckpointId(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me make sure I understand the flow here:

  1. Micro-batch ends, we call updateCheckpointId
  2. This goes through every stateful operator and calls updateCheckpointIdForOperator
  3. For each operator, we call into its getCheckpointInfo method
    1. That method will access the checkpointInfoAccumulator
    2. The checkpointInfoAccumulator is appended to using the unique ID from the state store after processing all data on the task
  4. In the future, we'll write this to the commit log.

Is this right?

@@ -803,6 +843,14 @@ class RocksDB(
/** Get the write buffer manager and cache */
def getWriteBufferManagerAndCache(): (WriteBufferManager, Cache) = (writeBufferManager, lruCache)

def getLatestCheckpointInfo(partitionId: Int): StateStoreCheckpointInfo = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this ever be called if lastCommittedCheckpointId is None or LastCommitBasedCheckpointId is None?

Comment on lines 163 to 174
// variables to manage checkpoint ID. Once a checkpoingting finishes, it nees to return
// the `lastCommittedCheckpointId` as the committed checkpointID, as well as
// `LastCommitBasedCheckpointId` as the checkpontID of the previous version that is based on.
// `loadedCheckpointId` is the checkpointID for the current live DB. After the batch finishes
// and checkpoint finishes, it will turn into `LastCommitBasedCheckpointId`.
// `sessionCheckpointId` store an ID to be used for future checkpoints. It is kept being used
// until we have to use a new one. We don't need to reuse any uniqueID, but reusing when possible
// can help debug problems.
@volatile private var LastCommitBasedCheckpointId: Option[String] = None
@volatile private var lastCommittedCheckpointId: Option[String] = None
@volatile private var loadedCheckpointId: Option[String] = None
@volatile private var sessionCheckpointId: Option[String] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We never read sessionCheckpointId and the comment doesn't really help me. What is it being used for?

Is there a reason LastCommitBasedCheckpointId is capitalized? And LastCommitBasedCheckpointId isn't even used in this PR since there is another TODO that says // TODO validate baseCheckpointId? Is that right?

Comment on lines 171 to 174
@volatile private var LastCommitBasedCheckpointId: Option[String] = None
@volatile private var lastCommittedCheckpointId: Option[String] = None
@volatile private var loadedCheckpointId: Option[String] = None
@volatile private var sessionCheckpointId: Option[String] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you comment specifically why these are marked as volatile? From what I can tell, these are only read/written to by the query execution thread.

partitionId: Int,
batchVersion: Long,
checkpointId: Option[String],
baseCheckpointId: Option[String])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We call this checkpointId in some places and baseCheckpointId in others? Can you clarify which is which, and what specifically it should be here?

Comment on lines +205 to +241
.map {
case (key, values) => key -> values.head
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This list would be non-zero only if there was a task retry/speculative execution, right?

Copy link
Contributor

@neilramaswamy neilramaswamy Sep 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And as discussed earlier today offline, this has the issue of not working if the same partition has multiple state stores, e.g. in a stream-stream join, which is actually a very serious issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants