-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-49411][SS] Communicate CheckpointID between driver and stateful operators #47895
base: master
Are you sure you want to change the base?
Conversation
val isFirstBatch: Boolean) | ||
val isFirstBatch: Boolean, | ||
val currentCheckpointUniqueId: | ||
MutableMap[Long, Array[String]] = MutableMap[Long, Array[String]]()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add comments on what are these unique Ids map to? I believe key is operator Id?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also better name it currentStateUniqueId
as it is only related to state store not general checkpoint
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also confused by this. When I sketched an implementation of your proposal in my head, my assumption would be that IncrementalExecution
would get just an ID, perhaps a single Long, that would correspond to the ID that it would bake into the physical plan sent to executors. So why is a map needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add a comment, but it is basically operatorID->partitionID->checkpointID
private def updateCheckpointId( | ||
execCtx: MicroBatchExecutionContext, | ||
latestExecPlan: SparkPlan): Unit = { | ||
// This function cannot handle MBP now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unnecessary comment
if (loadedVersion != version) { | ||
if (loadedVersion != version || | ||
(checkpointFormatVersion >= 2 && checkpointUniqueId.isDefined && | ||
(!loadedCheckpointId.isDefined || checkpointUniqueId.get != loadedCheckpointId.get))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: loadedCheckpointId.isEmpty
.agg(count("*")) | ||
.as[(Int, Long)] | ||
|
||
// Run the stream with changelog checkpointing disabled. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo?
// Store checkpointIDs for state store checkpoints to be committed or have been committed to | ||
// the commit log. | ||
// operatorID -> (partitionID -> uniqueID) | ||
private val currentCheckpointUniqueId = MutableMap[Long, Array[String]]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this is better to be put into the stream execution context
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
operatorID -> (partitionID -> uniqueID)
, is this supposed to mean a map of maps? If so, then why is the type of currentCheckpointUniqueId
just a single map?
I also don't fully understand why we would need a unique map for every operator X partition. Why is it not sufficient to have the following protocol, where we have one unique ID for every batch:
For the first batch, an ID is created and sent to all executors. When all tasks finish, that ID is persisted into the commit log. It is also kept in memory for the subsequent batch.
For any other batch, if there does not exist an ID in memory from the previous batch, then it must be read from the commit log and brought into memory. (This is the restart case.)
Then, using the ID in memory from the previous batch (call that prevId
), this is sent to all executors in the physical plan, as well as a new ID for the current batch (call this currId
). Before any processing start, executors must load and use the state for prevId
to process the current batch. Then, they can start processing, and they upload their state as <state file name>_currId.<changelog|snapshot>
.
What's wrong with that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now, the uniqueID is generated in executor. As a potential optimization, the driver can send a uniqueID to all executors, but executors still need to modify it to make it unique among all attempts of the same task. After doing that, the IDs won't be unique anymore, so we need different IDs per partition.
try { | ||
if (version < 0) { | ||
throw QueryExecutionErrors.unexpectedStateStoreVersion(version) | ||
} | ||
rocksDB.load(version, true) | ||
rocksDB.load(version, uniqueId, true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rocksDB.load(
version,
if (storeConf.stateStoreCheckpointFormatVersion >= 2) uniqueId else None)
@volatile private var LastCommitBasedCheckpointId: Option[String] = None | ||
@volatile private var lastCommittedCheckpointId: Option[String] = None | ||
@volatile private var loadedCheckpointId: Option[String] = None | ||
@volatile private var sessionCheckpointId: Option[String] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should reset these to None in rollback()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Going to stop reviewing since I have a few fundamental questions regarding the protocol.
@@ -105,7 +105,7 @@ class StreamStreamJoinStatePartitionReader( | |||
val stateInfo = StatefulOperatorStateInfo( | |||
partition.sourceOptions.stateCheckpointLocation.toString, | |||
partition.queryId, partition.sourceOptions.operatorId, | |||
partition.sourceOptions.batchId + 1, -1) | |||
partition.sourceOptions.batchId + 1, -1, None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this None
? I would image that users of the state data source reader now have to specify the id
that they would like to read, given that state stores are now not uniquely identified by operator/partition/name, but by id/operator/partition/name?
val isFirstBatch: Boolean) | ||
val isFirstBatch: Boolean, | ||
val currentCheckpointUniqueId: | ||
MutableMap[Long, Array[String]] = MutableMap[Long, Array[String]]()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also confused by this. When I sketched an implementation of your proposal in my head, my assumption would be that IncrementalExecution
would get just an ID, perhaps a single Long, that would correspond to the ID that it would bake into the physical plan sent to executors. So why is a map needed?
// Store checkpointIDs for state store checkpoints to be committed or have been committed to | ||
// the commit log. | ||
// operatorID -> (partitionID -> uniqueID) | ||
private val currentCheckpointUniqueId = MutableMap[Long, Array[String]]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
operatorID -> (partitionID -> uniqueID)
, is this supposed to mean a map of maps? If so, then why is the type of currentCheckpointUniqueId
just a single map?
I also don't fully understand why we would need a unique map for every operator X partition. Why is it not sufficient to have the following protocol, where we have one unique ID for every batch:
For the first batch, an ID is created and sent to all executors. When all tasks finish, that ID is persisted into the commit log. It is also kept in memory for the subsequent batch.
For any other batch, if there does not exist an ID in memory from the previous batch, then it must be read from the commit log and brought into memory. (This is the restart case.)
Then, using the ID in memory from the previous batch (call that prevId
), this is sent to all executors in the physical plan, as well as a new ID for the current batch (call this currId
). Before any processing start, executors must load and use the state for prevId
to process the current batch. Then, they can start processing, and they upload their state as <state file name>_currId.<changelog|snapshot>
.
What's wrong with that?
val ret = StatefulOperatorStateInfo( | ||
checkpointLocation, | ||
runId, | ||
statefulOperatorId.getAndIncrement(), | ||
operatorId, | ||
currentBatchId, | ||
numStateStores) | ||
numStateStores, | ||
currentCheckpointUniqueId.get(operatorId)) | ||
ret |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ret
is not needed
case e: StreamingDeduplicateWithinWatermarkExec => | ||
assert(e.stateInfo.isDefined) | ||
updateCheckpointIdForOperator(execCtx, e.stateInfo.get.operatorId, e.getCheckpointInfo()) | ||
// TODO Need to deal with FlatMapGroupsWithStateExec, TransformWithStateExec, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not?
And I also don't see why we need to enumerate all of these here. Can we leverage the StatefulOperator
trait and use that to get the state info? It should clean this up quite a bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You will, though, probably have to do some work to make sure that getCheckpointInfo
can be called for any stateful operator.
watermarkTracker.updateWatermark(execCtx.executionPlan.executedPlan) | ||
val latestExecPlan = execCtx.executionPlan.executedPlan | ||
watermarkTracker.updateWatermark(latestExecPlan) | ||
if (sparkSession.sessionState.conf.stateStoreCheckpointFormatVersion >= 2) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really like the >= 2
sprinkled everywhere. Can you define a constant somewhere, and then have a utility method that you call
val isFirstBatch: Boolean) | ||
val isFirstBatch: Boolean, | ||
val currentCheckpointUniqueId: | ||
MutableMap[Long, Array[String]] = MutableMap[Long, Array[String]]()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it always true that partition IDs are always [0, numPartitions)
?
}) | ||
} | ||
|
||
private def updateCheckpointId( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me make sure I understand the flow here:
- Micro-batch ends, we call
updateCheckpointId
- This goes through every stateful operator and calls
updateCheckpointIdForOperator
- For each operator, we call into its
getCheckpointInfo
method- That method will access the
checkpointInfoAccumulator
- The
checkpointInfoAccumulator
is appended to using the unique ID from the state store after processing all data on the task
- That method will access the
- In the future, we'll write this to the commit log.
Is this right?
@@ -803,6 +843,14 @@ class RocksDB( | |||
/** Get the write buffer manager and cache */ | |||
def getWriteBufferManagerAndCache(): (WriteBufferManager, Cache) = (writeBufferManager, lruCache) | |||
|
|||
def getLatestCheckpointInfo(partitionId: Int): StateStoreCheckpointInfo = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this ever be called if lastCommittedCheckpointId
is None
or LastCommitBasedCheckpointId
is None
?
// variables to manage checkpoint ID. Once a checkpoingting finishes, it nees to return | ||
// the `lastCommittedCheckpointId` as the committed checkpointID, as well as | ||
// `LastCommitBasedCheckpointId` as the checkpontID of the previous version that is based on. | ||
// `loadedCheckpointId` is the checkpointID for the current live DB. After the batch finishes | ||
// and checkpoint finishes, it will turn into `LastCommitBasedCheckpointId`. | ||
// `sessionCheckpointId` store an ID to be used for future checkpoints. It is kept being used | ||
// until we have to use a new one. We don't need to reuse any uniqueID, but reusing when possible | ||
// can help debug problems. | ||
@volatile private var LastCommitBasedCheckpointId: Option[String] = None | ||
@volatile private var lastCommittedCheckpointId: Option[String] = None | ||
@volatile private var loadedCheckpointId: Option[String] = None | ||
@volatile private var sessionCheckpointId: Option[String] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We never read sessionCheckpointId
and the comment doesn't really help me. What is it being used for?
Is there a reason LastCommitBasedCheckpointId
is capitalized? And LastCommitBasedCheckpointId
isn't even used in this PR since there is another TODO that says // TODO validate baseCheckpointId
? Is that right?
@volatile private var LastCommitBasedCheckpointId: Option[String] = None | ||
@volatile private var lastCommittedCheckpointId: Option[String] = None | ||
@volatile private var loadedCheckpointId: Option[String] = None | ||
@volatile private var sessionCheckpointId: Option[String] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you comment specifically why these are marked as volatile
? From what I can tell, these are only read/written to by the query execution thread.
partitionId: Int, | ||
batchVersion: Long, | ||
checkpointId: Option[String], | ||
baseCheckpointId: Option[String]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We call this checkpointId
in some places and baseCheckpointId
in others? Can you clarify which is which, and what specifically it should be here?
.map { | ||
case (key, values) => key -> values.head | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This list would be non-zero only if there was a task retry/speculative execution, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And as discussed earlier today offline, this has the issue of not working if the same partition has multiple state stores, e.g. in a stream-stream join, which is actually a very serious issue.
What changes were proposed in this pull request?
This is an incremental step to implement RocksDB state store checkpoint format V2.
Once conf STATE_STORE_CHECKPOINT_FORMAT_VERSION is set to be higher than version 2, the executor returns checkpointID to the driver (only done for RocksDB). The driver stores is locally. For the next batch, the checkpointID is sent to the executor to be used to load the state store. If the local version of the executor doesn't match the uniqueID, it will reload from the checkpoint.
There is no behavior change if the default checkpoint format is used.
Why are the changes needed?
This is an incremental step of the project of a new RocksDB State Store checkpoint format. The new format is to simplify checkpoint mechanism to make it less bug prone, and fix some unexpected query results in rare queries.
Does this PR introduce any user-facing change?
No
How was this patch tested?
A new unit test is added to cover format version. And another unit test is added to validate the uniqueID is passed back and force as expected.
Was this patch authored or co-authored using generative AI tooling?
No