Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] GpuPartitioning should close CVs before releasing semaphore #6913

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,7 @@ abstract class GpuHashPartitioningBase(expressions: Seq[Expression], numPartitio
partitionInternalAndClose(batch)
}
}
val ret = withResource(partitionColumns) { partitionColumns =>
sliceInternalGpuOrCpu(numRows, partitionIndexes, partitionColumns)
}
// Close the partition columns we copied them as a part of the slice
val ret = sliceInternalGpuOrCpuAndClose(numRows, partitionIndexes, partitionColumns)
ret.zipWithIndex.filter(_._1 != null)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,17 @@ trait GpuPartitioning extends Partitioning with Arm {
ret
}

def sliceInternalOnGpu(numRows: Int, partitionIndexes: Array[Int],
def sliceInternalOnGpuAndClose(numRows: Int, partitionIndexes: Array[Int],
partitionColumns: Array[GpuColumnVector]): Array[ColumnarBatch] = {
// The first index will always be 0, so we need to skip it.
val batches = if (numRows > 0) {
val parts = partitionIndexes.slice(1, partitionIndexes.length)
closeOnExcept(new ArrayBuffer[ColumnarBatch](numPartitions)) { splits =>
val table = new Table(partitionColumns.map(_.getBase).toArray: _*)
val contiguousTables = withResource(table)(t => t.contiguousSplit(parts: _*))
val contiguousTables = withResource(partitionColumns) { _ =>
withResource(new Table(partitionColumns.map(_.getBase).toArray: _*)) { table =>
table.contiguousSplit(parts: _*)
}
}
GpuShuffleEnv.rapidsShuffleCodec match {
case Some(codec) =>
compressSplits(splits, codec, contiguousTables)
Expand All @@ -80,13 +83,15 @@ trait GpuPartitioning extends Partitioning with Arm {
batches
}

def sliceInternalOnCpu(numRows: Int, partitionIndexes: Array[Int],
def sliceInternalOnCpuAndClose(numRows: Int, partitionIndexes: Array[Int],
partitionColumns: Array[GpuColumnVector]): Array[ColumnarBatch] = {
// We need to make sure that we have a null count calculated ahead of time.
// This should be a temp work around.
partitionColumns.foreach(_.getBase.getNullCount)

val hostPartColumns = partitionColumns.map(_.copyToHost())
val hostPartColumns = withResource(partitionColumns) { _ =>
partitionColumns.map(_.copyToHost())
}
try {
// Leaving the GPU for a while
GpuSemaphore.releaseIfNecessary(TaskContext.get())
Expand All @@ -105,7 +110,7 @@ trait GpuPartitioning extends Partitioning with Arm {
}
}

def sliceInternalGpuOrCpu(numRows: Int, partitionIndexes: Array[Int],
def sliceInternalGpuOrCpuAndClose(numRows: Int, partitionIndexes: Array[Int],
partitionColumns: Array[GpuColumnVector]): Array[ColumnarBatch] = {
val sliceOnGpu = usesGPUShuffle
val nvtxRangeKey = if (sliceOnGpu) {
Expand All @@ -117,9 +122,9 @@ trait GpuPartitioning extends Partitioning with Arm {
// for large number of small splits.
withResource(new NvtxRange(nvtxRangeKey, NvtxColor.CYAN)) { _ =>
if (sliceOnGpu) {
sliceInternalOnGpu(numRows, partitionIndexes, partitionColumns)
sliceInternalOnGpuAndClose(numRows, partitionIndexes, partitionColumns)
} else {
sliceInternalOnCpu(numRows, partitionIndexes, partitionColumns)
sliceInternalOnCpuAndClose(numRows, partitionIndexes, partitionColumns)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,19 +220,15 @@ case class GpuRangePartitioner(
override def columnarEval(batch: ColumnarBatch): Any = {
if (rangeBounds.nonEmpty) {
val (parts, partitionColumns) = computeBoundsAndClose(batch)
withResource(partitionColumns) { partitionColumns =>
val slicedCb = sliceInternalGpuOrCpu(partitionColumns.head.getRowCount.toInt,
parts, partitionColumns)
slicedCb.zipWithIndex.filter(_._1 != null)
}
val slicedCb = sliceInternalGpuOrCpuAndClose(partitionColumns.head.getRowCount.toInt,
parts, partitionColumns)
slicedCb.zipWithIndex.filter(_._1 != null)
} else {
withResource(batch) { cb =>
// Nothing needs to be sliced but a contiguous table is needed for GPU shuffle which
// slice will produce.
val sliced = sliceInternalGpuOrCpu(cb.numRows, Array(0),
GpuColumnVector.extractColumns(cb))
sliced.zipWithIndex.filter(_._1 != null)
}
// Nothing needs to be sliced but a contiguous table is needed for GPU shuffle which
// slice will produce.
val sliced = sliceInternalGpuOrCpuAndClose(batch.numRows, Array(0),
GpuColumnVector.extractColumns(batch))
sliced.zipWithIndex.filter(_._1 != null)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package com.nvidia.spark.rapids
import java.util.Random

import ai.rapids.cudf.{NvtxColor, NvtxRange}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.shims.ShimExpression

import org.apache.spark.TaskContext
Expand Down Expand Up @@ -78,9 +77,7 @@ case class GpuRoundRobinPartitioning(numPartitions: Int)
}
}
val ret: Array[ColumnarBatch] =
sliceInternalGpuOrCpu(numRows, partitionIndexes, partitionColumns)
partitionColumns.safeClose()
// Close the partition columns we copied them as a part of the slice
sliceInternalGpuOrCpuAndClose(numRows, partitionIndexes, partitionColumns)
ret.zipWithIndex.filter(_._1 != null)
} finally {
totalRange.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,13 @@ case object GpuSinglePartitioning extends GpuExpression with ShimExpression
if (batch.numCols == 0) {
Array(batch).zipWithIndex
} else {
withResource(batch) { batch =>
// Nothing needs to be sliced but a contiguous table is needed for GPU shuffle which
// slice will produce.
val sliced = sliceInternalGpuOrCpu(
batch.numRows,
Array(0),
GpuColumnVector.extractColumns(batch))
sliced.zipWithIndex.filter(_._1 != null)
}
// Nothing needs to be sliced but a contiguous table is needed for GPU shuffle which
// slice will produce.
val sliced = sliceInternalGpuOrCpuAndClose(
batch.numRows,
Array(0),
GpuColumnVector.extractColumns(batch))
sliced.zipWithIndex.filter(_._1 != null)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,13 @@ class GpuPartitioningSuite extends FunSuite with Arm {
override val numPartitions: Int = partitionIndices.length
}
withResource(buildBatch()) { batch =>
// `sliceInternalOnGpuAndClose` will close the batch, but in this test we want to
// reuse it
GpuColumnVector.incRefCounts(batch)
val columns = GpuColumnVector.extractColumns(batch)
val numRows = batch.numRows
withResource(gp.sliceInternalOnGpu(numRows, partitionIndices, columns)) { partitions =>
withResource(
gp.sliceInternalOnGpuAndClose(numRows, partitionIndices, columns)) { partitions =>
partitions.zipWithIndex.foreach { case (partBatch, partIndex) =>
val startRow = partitionIndices(partIndex)
val endRow = if (partIndex < partitionIndices.length - 1) {
Expand Down Expand Up @@ -153,10 +157,14 @@ class GpuPartitioningSuite extends FunSuite with Arm {
override val numPartitions: Int = partitionIndices.length
}
withResource(buildBatch()) { batch =>
// `sliceInternalOnGpuAndClose` will close the batch, but in this test we want to
// reuse it
GpuColumnVector.incRefCounts(batch)
val columns = GpuColumnVector.extractColumns(batch)
val sparkTypes = GpuColumnVector.extractTypes(batch)
val numRows = batch.numRows
withResource(gp.sliceInternalOnGpu(numRows, partitionIndices, columns)) { partitions =>
withResource(
gp.sliceInternalOnGpuAndClose(numRows, partitionIndices, columns)) { partitions =>
partitions.zipWithIndex.foreach { case (partBatch, partIndex) =>
val startRow = partitionIndices(partIndex)
val endRow = if (partIndex < partitionIndices.length - 1) {
Expand Down