Skip to content

Commit

Permalink
comments
Browse files Browse the repository at this point in the history
  • Loading branch information
siying committed Aug 27, 2024
1 parent 01297b6 commit cb9a5ab
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ class MicroBatchExecution(

protected var watermarkTracker: WatermarkTracker = _

// Store checkpointIDs for state store checkpoints to be committed or have been committed to
// the commit log.
// operatorID -> (partitionID -> uniqueID)
private val currentCheckpointUniqueId = MutableMap[Long, Array[String]]()

override lazy val logicalPlan: LogicalPlan = {
Expand Down Expand Up @@ -910,7 +913,7 @@ class MicroBatchExecution(
execCtx.batchId == -1 || v == execCtx.batchId + 1,
s"version $v doesn't match current Batch ID ${execCtx.batchId}")
}
currentCheckpointUniqueId.put(opId, checkpointInfo.map{ c =>
currentCheckpointUniqueId.put(opId, checkpointInfo.map { c =>
assert(c.checkpointId.isDefined)
c.checkpointId.get
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,20 @@ class RocksDB(
@volatile private var changelogWriter: Option[StateStoreChangelogWriter] = None
private val enableChangelogCheckpointing: Boolean = conf.enableChangelogCheckpointing
@volatile private var loadedVersion = -1L // -1 = nothing valid is loaded

// variables to manage checkpoint ID. Once a checkpoingting finishes, it nees to return
// the `lastCommittedCheckpointId` as the committed checkpointID, as well as
// `LastCommitBasedCheckpointId` as the checkpontID of the previous version that is based on.
// `loadedCheckpointId` is the checkpointID for the current live DB. After the batch finishes
// and checkpoint finishes, it will turn into `LastCommitBasedCheckpointId`.
// `sessionCheckpointId` store an ID to be used for future checkpoints. It is kept being used
// until we have to use a new one. We don't need to reuse any uniqueID, but reusing when possible
// can help debug problems.
@volatile private var LastCommitBasedCheckpointId: Option[String] = None
@volatile private var lastCommittedCheckpointId: Option[String] = None
@volatile private var loadedCheckpointId: Option[String] = None
@volatile private var sessionCheckpointId: Option[String] = None

@volatile private var numKeysOnLoadedVersion = 0L
@volatile private var numKeysOnWritingVersion = 0L
@volatile private var fileManagerMetrics = RocksDBFileManagerMetrics.EMPTY_METRICS
Expand Down Expand Up @@ -338,6 +348,8 @@ class RocksDB(
loadedVersion = -1 // invalidate loaded data
LastCommitBasedCheckpointId = None
lastCommittedCheckpointId = None
loadedCheckpointId = None
sessionCheckpointId = None
throw t
}
if (enableChangelogCheckpointing && !readOnly) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,11 @@ case class RangeKeyScanStateEncoderSpec(
* version of the data can be accessed. It is the responsible of the provider to populate
* this store with context information like the schema of keys and values, etc.
*
* If the checkpoint format version 2 is used, an additional argument `checkponitID` may be
* provided as part of `getStore(version, checkpoinId)`. The provider needs to guarantee
* that the loaded version is of this unique ID. It needs to load the version for this specific
* ID from the checkpoint if needed.
*
* - After the streaming query is stopped, the created provider instances are lazily disposed off.
*/
trait StateStoreProvider {
Expand Down Expand Up @@ -405,13 +410,15 @@ trait StateStoreProvider {
/** Called when the provider instance is unloaded from the executor */
def close(): Unit

/** Return an instance of [[StateStore]] representing state data of the given version */
/** Return an instance of [[StateStore]] representing state data of the given version.
* If `checkpointUniqueId` is provided, the instance also needs to match the ID. */
def getStore(
version: Long,
checkpointUniqueId: Option[String] = None): StateStore

/**
* Return an instance of [[ReadStateStore]] representing state data of the given version.
* Return an instance of [[ReadStateStore]] representing state data of the given version
* and uniqueID if provided.
* By default it will return the same instance as getStore(version) but wrapped to prevent
* modification. Providers can override and return optimized version of [[ReadStateStore]]
* based on the fact the instance will be only used for reading.
Expand Down

0 comments on commit cb9a5ab

Please sign in to comment.