From cb9a5abfdabfab6266a1c4df92c83f3617381e17 Mon Sep 17 00:00:00 2001 From: Siying Dong Date: Tue, 27 Aug 2024 13:49:01 -0700 Subject: [PATCH] comments --- .../execution/streaming/MicroBatchExecution.scala | 5 ++++- .../sql/execution/streaming/state/RocksDB.scala | 12 ++++++++++++ .../sql/execution/streaming/state/StateStore.scala | 11 +++++++++-- 3 files changed, 25 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 4068b0c1bb748..5266f8a9654db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -131,6 +131,9 @@ class MicroBatchExecution( protected var watermarkTracker: WatermarkTracker = _ + // Store checkpointIDs for state store checkpoints to be committed or have been committed to + // the commit log. + // operatorID -> (partitionID -> uniqueID) private val currentCheckpointUniqueId = MutableMap[Long, Array[String]]() override lazy val logicalPlan: LogicalPlan = { @@ -910,7 +913,7 @@ class MicroBatchExecution( execCtx.batchId == -1 || v == execCtx.batchId + 1, s"version $v doesn't match current Batch ID ${execCtx.batchId}") } - currentCheckpointUniqueId.put(opId, checkpointInfo.map{ c => + currentCheckpointUniqueId.put(opId, checkpointInfo.map { c => assert(c.checkpointId.isDefined) c.checkpointId.get }) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 8e2f1c8fed1c4..0babf7174f679 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -159,10 +159,20 @@ class RocksDB( @volatile private var changelogWriter: Option[StateStoreChangelogWriter] = None private val enableChangelogCheckpointing: Boolean = conf.enableChangelogCheckpointing @volatile private var loadedVersion = -1L // -1 = nothing valid is loaded + + // variables to manage checkpoint ID. Once a checkpoingting finishes, it nees to return + // the `lastCommittedCheckpointId` as the committed checkpointID, as well as + // `LastCommitBasedCheckpointId` as the checkpontID of the previous version that is based on. + // `loadedCheckpointId` is the checkpointID for the current live DB. After the batch finishes + // and checkpoint finishes, it will turn into `LastCommitBasedCheckpointId`. + // `sessionCheckpointId` store an ID to be used for future checkpoints. It is kept being used + // until we have to use a new one. We don't need to reuse any uniqueID, but reusing when possible + // can help debug problems. @volatile private var LastCommitBasedCheckpointId: Option[String] = None @volatile private var lastCommittedCheckpointId: Option[String] = None @volatile private var loadedCheckpointId: Option[String] = None @volatile private var sessionCheckpointId: Option[String] = None + @volatile private var numKeysOnLoadedVersion = 0L @volatile private var numKeysOnWritingVersion = 0L @volatile private var fileManagerMetrics = RocksDBFileManagerMetrics.EMPTY_METRICS @@ -338,6 +348,8 @@ class RocksDB( loadedVersion = -1 // invalidate loaded data LastCommitBasedCheckpointId = None lastCommittedCheckpointId = None + loadedCheckpointId = None + sessionCheckpointId = None throw t } if (enableChangelogCheckpointing && !readOnly) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 376470f110b39..f7244c00903c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -364,6 +364,11 @@ case class RangeKeyScanStateEncoderSpec( * version of the data can be accessed. It is the responsible of the provider to populate * this store with context information like the schema of keys and values, etc. * + * If the checkpoint format version 2 is used, an additional argument `checkponitID` may be + * provided as part of `getStore(version, checkpoinId)`. The provider needs to guarantee + * that the loaded version is of this unique ID. It needs to load the version for this specific + * ID from the checkpoint if needed. + * * - After the streaming query is stopped, the created provider instances are lazily disposed off. */ trait StateStoreProvider { @@ -405,13 +410,15 @@ trait StateStoreProvider { /** Called when the provider instance is unloaded from the executor */ def close(): Unit - /** Return an instance of [[StateStore]] representing state data of the given version */ + /** Return an instance of [[StateStore]] representing state data of the given version. + * If `checkpointUniqueId` is provided, the instance also needs to match the ID. */ def getStore( version: Long, checkpointUniqueId: Option[String] = None): StateStore /** - * Return an instance of [[ReadStateStore]] representing state data of the given version. + * Return an instance of [[ReadStateStore]] representing state data of the given version + * and uniqueID if provided. * By default it will return the same instance as getStore(version) but wrapped to prevent * modification. Providers can override and return optimized version of [[ReadStateStore]] * based on the fact the instance will be only used for reading.