Skip to content

Commit

Permalink
Show partition metrics for custom shuffler reader (NVIDIA#1060)
Browse files Browse the repository at this point in the history
* Add partition metrics to custom shuffle reader

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* revert change

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* refactor to combine metrics and reader in single match statement

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* use shim layer to get map output sizes

Signed-off-by: Andy Grove <andygrove@nvidia.com>
  • Loading branch information
andygrove authored Nov 20, 2020
1 parent ae630a4 commit 4fe637f
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ object GpuMetricNames {
val NUM_INPUT_BATCHES = "numInputBatches"
val NUM_OUTPUT_ROWS = "numOutputRows"
val NUM_OUTPUT_BATCHES = "numOutputBatches"
val PARTITION_SIZE = "partitionSize"
val NUM_PARTITIONS = "numPartitions"
val TOTAL_TIME = "totalTime"
val PEAK_DEVICE_MEMORY = "peakDevMemory"

Expand All @@ -41,6 +43,8 @@ object GpuMetricNames {
val DESCRIPTION_NUM_INPUT_BATCHES = "number of input columnar batches"
val DESCRIPTION_NUM_OUTPUT_ROWS = "number of output rows"
val DESCRIPTION_NUM_OUTPUT_BATCHES = "number of output columnar batches"
val DESCRIPTION_PARTITION_SIZE = "partition data size"
val DESCRIPTION_NUM_PARTITIONS = "number of partitions"
val DESCRIPTION_TOTAL_TIME = "total time"
val DESCRIPTION_PEAK_DEVICE_MEMORY = "peak device memory"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package org.apache.spark.sql.rapids.execution

import com.nvidia.spark.rapids.{CoalesceGoal, GpuExec, ShimLoader}
import com.nvidia.spark.rapids.GpuMetricNames.{DESCRIPTION_TOTAL_TIME, TOTAL_TIME}
import com.nvidia.spark.rapids.GpuMetricNames.{DESCRIPTION_NUM_PARTITIONS, DESCRIPTION_PARTITION_SIZE, DESCRIPTION_TOTAL_TIME, NUM_PARTITIONS, PARTITION_SIZE, TOTAL_TIME}

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -47,6 +47,10 @@ case class GpuCustomShuffleReaderExec(
* The Spark version of this operator does not output any metrics.
*/
override lazy val metrics: Map[String, SQLMetric] = Map(
PARTITION_SIZE ->
SQLMetrics.createSizeMetric(sparkContext, DESCRIPTION_PARTITION_SIZE),
NUM_PARTITIONS ->
SQLMetrics.createMetric(sparkContext, DESCRIPTION_NUM_PARTITIONS),
TOTAL_TIME -> SQLMetrics.createNanoTimingMetric(sparkContext, DESCRIPTION_TOTAL_TIME)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import scala.collection.AbstractIterator
import scala.concurrent.Future

import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.GpuMetricNames.{DESCRIPTION_NUM_OUTPUT_BATCHES, DESCRIPTION_NUM_OUTPUT_ROWS, NUM_OUTPUT_BATCHES, NUM_OUTPUT_ROWS}
import com.nvidia.spark.rapids.GpuMetricNames.{DESCRIPTION_NUM_OUTPUT_BATCHES, DESCRIPTION_NUM_OUTPUT_ROWS, DESCRIPTION_NUM_PARTITIONS, DESCRIPTION_PARTITION_SIZE, NUM_OUTPUT_BATCHES, NUM_OUTPUT_ROWS, NUM_PARTITIONS, PARTITION_SIZE}
import com.nvidia.spark.rapids.RapidsPluginImplicits._

import org.apache.spark.{MapOutputStatistics, ShuffleDependency}
Expand Down Expand Up @@ -86,6 +86,10 @@ abstract class GpuShuffleExchangeExecBase(

// Spark doesn't report totalTime for this operator so we override metrics
override lazy val metrics: Map[String, SQLMetric] = Map(
PARTITION_SIZE ->
SQLMetrics.createMetric(sparkContext, DESCRIPTION_PARTITION_SIZE),
NUM_PARTITIONS ->
SQLMetrics.createMetric(sparkContext, DESCRIPTION_NUM_PARTITIONS),
NUM_OUTPUT_ROWS -> SQLMetrics.createMetric(sparkContext, DESCRIPTION_NUM_OUTPUT_ROWS),
NUM_OUTPUT_BATCHES -> SQLMetrics.createMetric(sparkContext, DESCRIPTION_NUM_OUTPUT_BATCHES)
) ++ additionalMetrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,23 +155,28 @@ class ShuffledBatchRDD(
}

override def compute(split: Partition, context: TaskContext): Iterator[ColumnarBatch] = {
val shuffledRowPartition = split.asInstanceOf[ShuffledBatchRDDPartition]
val shuffledBatchPartition = split.asInstanceOf[ShuffledBatchRDDPartition]
val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics()
// `SQLShuffleReadMetricsReporter` will update its own metrics for SQL exchange operator,
// as well as the `tempMetrics` for basic shuffle metrics.
val sqlMetricsReporter = new SQLShuffleReadMetricsReporter(tempMetrics, metrics)
val shuffleManagerShims = ShimLoader.getSparkShims.getShuffleManagerShims()
val reader = split.asInstanceOf[ShuffledBatchRDDPartition].spec match {
val shim = ShimLoader.getSparkShims
val shuffleManagerShims = shim.getShuffleManagerShims()
val (reader, partitionSize) = shuffledBatchPartition.spec match {
case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) =>
SparkEnv.get.shuffleManager.getReader(
val reader = SparkEnv.get.shuffleManager.getReader(
dependency.shuffleHandle,
startReducerIndex,
endReducerIndex,
context,
sqlMetricsReporter)
val blocksByAddress = shim.getMapSizesByExecutorId(
dependency.shuffleHandle.shuffleId, 0, Int.MaxValue, startReducerIndex, endReducerIndex)
val partitionSize = blocksByAddress.flatMap(_._2).map(_._2).sum
(reader, partitionSize)

case PartialReducerPartitionSpec(reducerIndex, startMapIndex, endMapIndex) =>
shuffleManagerShims.getReader(
val reader = shuffleManagerShims.getReader(
SparkEnv.get.shuffleManager,
dependency.shuffleHandle,
startMapIndex,
Expand All @@ -180,9 +185,15 @@ class ShuffledBatchRDD(
reducerIndex + 1,
context,
sqlMetricsReporter)
val blocksByAddress = shim.getMapSizesByExecutorId(
dependency.shuffleHandle.shuffleId, 0, Int.MaxValue, reducerIndex, reducerIndex + 1)
val partitionSize = blocksByAddress.flatMap(_._2)
.filter(tuple => tuple._3 >= startMapIndex && tuple._3 < endMapIndex)
.map(_._2).sum
(reader, partitionSize)

case PartialMapperPartitionSpec(mapIndex, startReducerIndex, endReducerIndex) =>
shuffleManagerShims.getReader(
val reader = shuffleManagerShims.getReader(
SparkEnv.get.shuffleManager,
dependency.shuffleHandle,
mapIndex,
Expand All @@ -191,7 +202,15 @@ class ShuffledBatchRDD(
endReducerIndex,
context,
sqlMetricsReporter)
val blocksByAddress = shim.getMapSizesByExecutorId(
dependency.shuffleHandle.shuffleId, 0, Int.MaxValue, startReducerIndex, endReducerIndex)
val partitionSize = blocksByAddress.flatMap(_._2)
.filter(_._3 == mapIndex)
.map(_._2).sum
(reader, partitionSize)
}
metrics(GpuMetricNames.NUM_PARTITIONS).add(1)
metrics(GpuMetricNames.PARTITION_SIZE).add(partitionSize)
reader.read().asInstanceOf[Iterator[Product2[Int, ColumnarBatch]]].map(_._2)
}

Expand Down

0 comments on commit 4fe637f

Please sign in to comment.