Skip to content

Commit

Permalink
[SPARK-45084][SS] StateOperatorProgress to use accurate effective shu…
Browse files Browse the repository at this point in the history
…ffle partition number

### What changes were proposed in this pull request?
Make StateOperatorProgress.numShufflePartitions to use the effective number of shuffle partitions is reported.
This metric StateStoreWriter.numShufflePartitions is dropped at the same time, as it is not a metric anymore.

### Why are the changes needed?
Currently, there is a numShufflePartitions "metric" reported in
StateOperatorProgress part of the progress report. However, the number is reported by aggregating executors so in the case of task retry or speculative executor, the metric is higher than number of shuffle partitions for the query plan. We change the metric to use the value to use to make it more usable.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
StreamingAggregationSuite contains a unit test that validates the value

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #42822 from siying/numShufflePartitionsMetric.

Authored-by: Siying Dong <siying.dong@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
  • Loading branch information
siying authored and HeartSaVioR committed Sep 15, 2023
1 parent db5f37c commit 25c624f
Showing 1 changed file with 3 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ trait StateStoreWriter extends StatefulOperator with PythonSQLMetrics { self: Sp
"allRemovalsTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to remove"),
"commitTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to commit changes"),
"stateMemory" -> SQLMetrics.createSizeMetric(sparkContext, "memory used by state"),
"numShufflePartitions" -> SQLMetrics.createMetric(sparkContext, "number of shuffle partitions"),
"numStateStoreInstances" -> SQLMetrics.createMetric(sparkContext,
"number of state store instances")
) ++ stateStoreCustomMetrics ++ pythonMetrics
Expand All @@ -159,6 +158,8 @@ trait StateStoreWriter extends StatefulOperator with PythonSQLMetrics { self: Sp
val javaConvertedCustomMetrics: java.util.HashMap[String, java.lang.Long] =
new java.util.HashMap(customMetrics.mapValues(long2Long).toMap.asJava)

// We now don't report number of shuffle partitions inside the state operator. Instead,
// it will be filled when the stream query progress is reported
new StateOperatorProgress(
operatorName = shortName,
numRowsTotal = longMetric("numTotalStateRows").value,
Expand All @@ -169,7 +170,7 @@ trait StateStoreWriter extends StatefulOperator with PythonSQLMetrics { self: Sp
commitTimeMs = longMetric("commitTimeMs").value,
memoryUsedBytes = longMetric("stateMemory").value,
numRowsDroppedByWatermark = longMetric("numRowsDroppedByWatermark").value,
numShufflePartitions = longMetric("numShufflePartitions").value,
numShufflePartitions = stateInfo.map(_.numPartitions.toLong).getOrElse(-1L),
numStateStoreInstances = longMetric("numStateStoreInstances").value,
javaConvertedCustomMetrics
)
Expand All @@ -183,7 +184,6 @@ trait StateStoreWriter extends StatefulOperator with PythonSQLMetrics { self: Sp
assert(numStateStoreInstances >= 1, s"invalid number of stores: $numStateStoreInstances")
// Shuffle partitions capture the number of tasks that have this stateful operator instance.
// For each task instance this number is incremented by one.
longMetric("numShufflePartitions") += 1
longMetric("numStateStoreInstances") += numStateStoreInstances
}

Expand Down

0 comments on commit 25c624f

Please sign in to comment.