Skip to content

Commit

Permalink
GpuPartitioning should close CVs before releasing semaphore
Browse files Browse the repository at this point in the history
  • Loading branch information
abellina committed Oct 25, 2022
1 parent b58d63b commit a1553ac
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,7 @@ abstract class GpuHashPartitioningBase(expressions: Seq[Expression], numPartitio
partitionInternalAndClose(batch)
}
}
val ret = withResource(partitionColumns) { partitionColumns =>
sliceInternalGpuOrCpu(numRows, partitionIndexes, partitionColumns)
}
val ret = sliceInternalGpuOrCpuAndClose(numRows, partitionIndexes, partitionColumns)
// Close the partition columns we copied them as a part of the slice
ret.zipWithIndex.filter(_._1 != null)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,17 @@ trait GpuPartitioning extends Partitioning with Arm {
ret
}

def sliceInternalOnGpu(numRows: Int, partitionIndexes: Array[Int],
def sliceInternalOnGpuAndClose(numRows: Int, partitionIndexes: Array[Int],
partitionColumns: Array[GpuColumnVector]): Array[ColumnarBatch] = {
// The first index will always be 0, so we need to skip it.
val batches = if (numRows > 0) {
val parts = partitionIndexes.slice(1, partitionIndexes.length)
closeOnExcept(new ArrayBuffer[ColumnarBatch](numPartitions)) { splits =>
val table = new Table(partitionColumns.map(_.getBase).toArray: _*)
val contiguousTables = withResource(table)(t => t.contiguousSplit(parts: _*))
val contiguousTables = withResource(partitionColumns) { _ =>
withResource(new Table(partitionColumns.map(_.getBase).toArray: _*)) { table =>
table.contiguousSplit(parts: _*)
}
}
GpuShuffleEnv.rapidsShuffleCodec match {
case Some(codec) =>
compressSplits(splits, codec, contiguousTables)
Expand All @@ -80,13 +83,15 @@ trait GpuPartitioning extends Partitioning with Arm {
batches
}

def sliceInternalOnCpu(numRows: Int, partitionIndexes: Array[Int],
def sliceInternalOnCpuAndClose(numRows: Int, partitionIndexes: Array[Int],
partitionColumns: Array[GpuColumnVector]): Array[ColumnarBatch] = {
// We need to make sure that we have a null count calculated ahead of time.
// This should be a temp work around.
partitionColumns.foreach(_.getBase.getNullCount)

val hostPartColumns = partitionColumns.map(_.copyToHost())
val hostPartColumns = withResource(partitionColumns) { _ =>
partitionColumns.map(_.copyToHost())
}
try {
// Leaving the GPU for a while
GpuSemaphore.releaseIfNecessary(TaskContext.get())
Expand All @@ -105,7 +110,7 @@ trait GpuPartitioning extends Partitioning with Arm {
}
}

def sliceInternalGpuOrCpu(numRows: Int, partitionIndexes: Array[Int],
def sliceInternalGpuOrCpuAndClose(numRows: Int, partitionIndexes: Array[Int],
partitionColumns: Array[GpuColumnVector]): Array[ColumnarBatch] = {
val sliceOnGpu = usesGPUShuffle
val nvtxRangeKey = if (sliceOnGpu) {
Expand All @@ -117,9 +122,9 @@ trait GpuPartitioning extends Partitioning with Arm {
// for large number of small splits.
withResource(new NvtxRange(nvtxRangeKey, NvtxColor.CYAN)) { _ =>
if (sliceOnGpu) {
sliceInternalOnGpu(numRows, partitionIndexes, partitionColumns)
sliceInternalOnGpuAndClose(numRows, partitionIndexes, partitionColumns)
} else {
sliceInternalOnCpu(numRows, partitionIndexes, partitionColumns)
sliceInternalOnCpuAndClose(numRows, partitionIndexes, partitionColumns)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,19 +220,15 @@ case class GpuRangePartitioner(
override def columnarEval(batch: ColumnarBatch): Any = {
if (rangeBounds.nonEmpty) {
val (parts, partitionColumns) = computeBoundsAndClose(batch)
withResource(partitionColumns) { partitionColumns =>
val slicedCb = sliceInternalGpuOrCpu(partitionColumns.head.getRowCount.toInt,
parts, partitionColumns)
slicedCb.zipWithIndex.filter(_._1 != null)
}
val slicedCb = sliceInternalGpuOrCpuAndClose(partitionColumns.head.getRowCount.toInt,
parts, partitionColumns)
slicedCb.zipWithIndex.filter(_._1 != null)
} else {
withResource(batch) { cb =>
// Nothing needs to be sliced but a contiguous table is needed for GPU shuffle which
// slice will produce.
val sliced = sliceInternalGpuOrCpu(cb.numRows, Array(0),
GpuColumnVector.extractColumns(cb))
sliced.zipWithIndex.filter(_._1 != null)
}
// Nothing needs to be sliced but a contiguous table is needed for GPU shuffle which
// slice will produce.
val sliced = sliceInternalGpuOrCpuAndClose(batch.numRows, Array(0),
GpuColumnVector.extractColumns(batch))
sliced.zipWithIndex.filter(_._1 != null)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package com.nvidia.spark.rapids
import java.util.Random

import ai.rapids.cudf.{NvtxColor, NvtxRange}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.shims.ShimExpression

import org.apache.spark.TaskContext
Expand Down Expand Up @@ -78,8 +77,7 @@ case class GpuRoundRobinPartitioning(numPartitions: Int)
}
}
val ret: Array[ColumnarBatch] =
sliceInternalGpuOrCpu(numRows, partitionIndexes, partitionColumns)
partitionColumns.safeClose()
sliceInternalGpuOrCpuAndClose(numRows, partitionIndexes, partitionColumns)
// Close the partition columns we copied them as a part of the slice
ret.zipWithIndex.filter(_._1 != null)
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,13 @@ case object GpuSinglePartitioning extends GpuExpression with ShimExpression
if (batch.numCols == 0) {
Array(batch).zipWithIndex
} else {
withResource(batch) { batch =>
// Nothing needs to be sliced but a contiguous table is needed for GPU shuffle which
// slice will produce.
val sliced = sliceInternalGpuOrCpu(
batch.numRows,
Array(0),
GpuColumnVector.extractColumns(batch))
sliced.zipWithIndex.filter(_._1 != null)
}
// Nothing needs to be sliced but a contiguous table is needed for GPU shuffle which
// slice will produce.
val sliced = sliceInternalGpuOrCpuAndClose(
batch.numRows,
Array(0),
GpuColumnVector.extractColumns(batch))
sliced.zipWithIndex.filter(_._1 != null)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,13 @@ class GpuPartitioningSuite extends FunSuite with Arm {
override val numPartitions: Int = partitionIndices.length
}
withResource(buildBatch()) { batch =>
// `sliceInternalOnGpuAndClose` will close the batch, but in this test we want to
// reuse it
GpuColumnVector.incRefCounts(batch)
val columns = GpuColumnVector.extractColumns(batch)
val numRows = batch.numRows
withResource(gp.sliceInternalOnGpu(numRows, partitionIndices, columns)) { partitions =>
withResource(
gp.sliceInternalOnGpuAndClose(numRows, partitionIndices, columns)) { partitions =>
partitions.zipWithIndex.foreach { case (partBatch, partIndex) =>
val startRow = partitionIndices(partIndex)
val endRow = if (partIndex < partitionIndices.length - 1) {
Expand Down Expand Up @@ -153,10 +157,14 @@ class GpuPartitioningSuite extends FunSuite with Arm {
override val numPartitions: Int = partitionIndices.length
}
withResource(buildBatch()) { batch =>
// `sliceInternalOnGpuAndClose` will close the batch, but in this test we want to
// reuse it
GpuColumnVector.incRefCounts(batch)
val columns = GpuColumnVector.extractColumns(batch)
val sparkTypes = GpuColumnVector.extractTypes(batch)
val numRows = batch.numRows
withResource(gp.sliceInternalOnGpu(numRows, partitionIndices, columns)) { partitions =>
withResource(
gp.sliceInternalOnGpuAndClose(numRows, partitionIndices, columns)) { partitions =>
partitions.zipWithIndex.foreach { case (partBatch, partIndex) =>
val startRow = partitionIndices(partIndex)
val endRow = if (partIndex < partitionIndices.length - 1) {
Expand Down

0 comments on commit a1553ac

Please sign in to comment.