Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix data size metric always 0 when using RAPIDS shuffle #603

Merged
merged 1 commit into from
Aug 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.{Aggregator, Partitioner, ShuffleDependency, SparkEnv}
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.ShuffleWriteProcessor
import org.apache.spark.sql.execution.metric.SQLMetric

class GpuShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
rdd: RDD[_ <: Product2[K, V]],
Expand All @@ -30,10 +31,10 @@ class GpuShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
keyOrdering: Option[Ordering[K]] = None,
aggregator: Option[Aggregator[K, V, C]] = None,
mapSideCombine: Boolean = false,
shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor)
shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor,
val metrics: Map[String, SQLMetric] = Map.empty)
extends ShuffleDependency[K, V, C](rdd, partitioner, serializer, keyOrdering,
aggregator, mapSideCombine, shuffleWriterProcessor) {

override def toString: String = "GPU Shuffle Dependency"
}

Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.shuffle._
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.storage._

class GpuShuffleHandle[K, V](
val wrapped: ShuffleHandle,
override val dependency: ShuffleDependency[K, V, V])
override val dependency: GpuShuffleDependency[K, V, V])
extends BaseShuffleHandle(wrapped.shuffleId, dependency) {

override def toString: String = s"GPU SHUFFLE HANDLE $shuffleId"
Expand Down Expand Up @@ -76,14 +77,16 @@ class RapidsCachingWriter[K, V](
// the data being released
handle: GpuShuffleHandle[K, V],
mapId: Long,
metrics: ShuffleWriteMetricsReporter,
metricsReporter: ShuffleWriteMetricsReporter,
catalog: ShuffleBufferCatalog,
shuffleStorage: RapidsDeviceMemoryStore,
rapidsShuffleServer: Option[RapidsShuffleServer]) extends ShuffleWriter[K, V] with Logging {
rapidsShuffleServer: Option[RapidsShuffleServer],
metrics: Map[String, SQLMetric]) extends ShuffleWriter[K, V] with Logging {

private val numParts = handle.dependency.partitioner.numPartitions
private val sizes = new Array[Long](numParts)
private val writtenBufferIds = new ArrayBuffer[ShuffleBufferId](numParts)
private val uncompressedMetric: SQLMetric = metrics("dataSize")

override def write(records: Iterator[Product2[K, V]]): Unit = {
val nvtxRange = new NvtxRange("RapidsCachingWriter.write", NvtxColor.CYAN)
Expand All @@ -105,6 +108,7 @@ class RapidsCachingWriter[K, V](
case c: GpuColumnVectorFromBuffer =>
val buffer = c.getBuffer.slice(0, c.getBuffer.getLength)
partSize = buffer.getLength
uncompressedMetric += partSize
shuffleStorage.addTable(
bufferId,
GpuColumnVector.from(batch),
Expand All @@ -116,6 +120,7 @@ class RapidsCachingWriter[K, V](
val tableMeta = c.getTableMeta
// update the table metadata for the buffer ID generated above
tableMeta.bufferMeta.mutateId(bufferId.tableId)
uncompressedMetric += tableMeta.bufferMeta().uncompressedSize()
shuffleStorage.addBuffer(
bufferId,
buffer,
Expand All @@ -141,8 +146,8 @@ class RapidsCachingWriter[K, V](
}
writtenBufferIds.append(bufferId)
}
metrics.incBytesWritten(bytesWritten)
metrics.incRecordsWritten(recordsWritten)
metricsReporter.incBytesWritten(bytesWritten)
metricsReporter.incRecordsWritten(recordsWritten)
} finally {
nvtxRange.close()
}
Expand Down Expand Up @@ -266,7 +271,8 @@ abstract class RapidsShuffleInternalManagerBase(conf: SparkConf, isDriver: Boole
// Always register with the wrapped handler so we can write to it ourselves if needed
val orig = wrapped.registerShuffle(shuffleId, dependency)
if (!shouldFallThroughOnEverything && dependency.isInstanceOf[GpuShuffleDependency[K, V, C]]) {
val handle = new GpuShuffleHandle(orig, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
val handle = new GpuShuffleHandle(orig,
dependency.asInstanceOf[GpuShuffleDependency[K, V, V]])
handle
} else {
orig
Expand All @@ -277,20 +283,21 @@ abstract class RapidsShuffleInternalManagerBase(conf: SparkConf, isDriver: Boole
handle: ShuffleHandle,
mapId: Long,
context: TaskContext,
metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = {
metricsReporter: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = {
handle match {
case gpu: GpuShuffleHandle[_, _] =>
registerGpuShuffle(handle.shuffleId)
new RapidsCachingWriter(
env.blockManager,
gpu.asInstanceOf[GpuShuffleHandle[K, V]],
mapId,
metrics,
metricsReporter,
catalog,
GpuShuffleEnv.getDeviceStorage,
server)
server,
gpu.dependency.metrics)
case other =>
wrapped.getWriter(other, mapId, context, metrics)
wrapped.getWriter(other, mapId, context, metricsReporter)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ import org.apache.spark.serializer.Serializer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{ShufflePartitionSpec, SparkPlan}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec}
import org.apache.spark.sql.execution.metric._
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -110,7 +109,8 @@ abstract class GpuShuffleExchangeExecBase(
outputPartitioning,
serializer,
metrics,
writeMetrics)
writeMetrics,
additionalMetrics)
}

/**
Expand All @@ -137,7 +137,8 @@ object GpuShuffleExchangeExec {
newPartitioning: Partitioning,
serializer: Serializer,
metrics: Map[String, SQLMetric],
writeMetrics: Map[String, SQLMetric])
writeMetrics: Map[String, SQLMetric],
additionalMetrics: Map[String, SQLMetric])
: ShuffleDependency[Int, ColumnarBatch, ColumnarBatch] = {
val isRoundRobin = newPartitioning match {
case _: GpuRoundRobinPartitioning => true
Expand Down Expand Up @@ -228,7 +229,8 @@ object GpuShuffleExchangeExec {
rddWithPartitionIds,
new BatchPartitionIdPassthrough(newPartitioning.numPartitions),
serializer,
shuffleWriterProcessor = ShuffleExchangeExec.createShuffleWriteProcessor(writeMetrics))
shuffleWriterProcessor = ShuffleExchangeExec.createShuffleWriteProcessor(writeMetrics),
metrics = additionalMetrics)

dependency
}
Expand Down