diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuHashPartitioningBase.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuHashPartitioningBase.scala index d1a49c1d6f4e..6f456a239126 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuHashPartitioningBase.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuHashPartitioningBase.scala @@ -67,9 +67,7 @@ abstract class GpuHashPartitioningBase(expressions: Seq[Expression], numPartitio partitionInternalAndClose(batch) } } - val ret = withResource(partitionColumns) { partitionColumns => - sliceInternalGpuOrCpu(numRows, partitionIndexes, partitionColumns) - } + val ret = sliceInternalGpuOrCpuAndClose(numRows, partitionIndexes, partitionColumns) // Close the partition columns we copied them as a part of the slice ret.zipWithIndex.filter(_._1 != null) } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala index 390fc45d28ba..157f4bcee4f8 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala @@ -49,14 +49,17 @@ trait GpuPartitioning extends Partitioning with Arm { ret } - def sliceInternalOnGpu(numRows: Int, partitionIndexes: Array[Int], + def sliceInternalOnGpuAndClose(numRows: Int, partitionIndexes: Array[Int], partitionColumns: Array[GpuColumnVector]): Array[ColumnarBatch] = { // The first index will always be 0, so we need to skip it. val batches = if (numRows > 0) { val parts = partitionIndexes.slice(1, partitionIndexes.length) closeOnExcept(new ArrayBuffer[ColumnarBatch](numPartitions)) { splits => - val table = new Table(partitionColumns.map(_.getBase).toArray: _*) - val contiguousTables = withResource(table)(t => t.contiguousSplit(parts: _*)) + val contiguousTables = withResource(partitionColumns) { _ => + withResource(new Table(partitionColumns.map(_.getBase).toArray: _*)) { table => + table.contiguousSplit(parts: _*) + } + } GpuShuffleEnv.rapidsShuffleCodec match { case Some(codec) => compressSplits(splits, codec, contiguousTables) @@ -80,13 +83,15 @@ trait GpuPartitioning extends Partitioning with Arm { batches } - def sliceInternalOnCpu(numRows: Int, partitionIndexes: Array[Int], + def sliceInternalOnCpuAndClose(numRows: Int, partitionIndexes: Array[Int], partitionColumns: Array[GpuColumnVector]): Array[ColumnarBatch] = { // We need to make sure that we have a null count calculated ahead of time. // This should be a temp work around. partitionColumns.foreach(_.getBase.getNullCount) - val hostPartColumns = partitionColumns.map(_.copyToHost()) + val hostPartColumns = withResource(partitionColumns) { _ => + partitionColumns.map(_.copyToHost()) + } try { // Leaving the GPU for a while GpuSemaphore.releaseIfNecessary(TaskContext.get()) @@ -105,7 +110,7 @@ trait GpuPartitioning extends Partitioning with Arm { } } - def sliceInternalGpuOrCpu(numRows: Int, partitionIndexes: Array[Int], + def sliceInternalGpuOrCpuAndClose(numRows: Int, partitionIndexes: Array[Int], partitionColumns: Array[GpuColumnVector]): Array[ColumnarBatch] = { val sliceOnGpu = usesGPUShuffle val nvtxRangeKey = if (sliceOnGpu) { @@ -117,9 +122,9 @@ trait GpuPartitioning extends Partitioning with Arm { // for large number of small splits. withResource(new NvtxRange(nvtxRangeKey, NvtxColor.CYAN)) { _ => if (sliceOnGpu) { - sliceInternalOnGpu(numRows, partitionIndexes, partitionColumns) + sliceInternalOnGpuAndClose(numRows, partitionIndexes, partitionColumns) } else { - sliceInternalOnCpu(numRows, partitionIndexes, partitionColumns) + sliceInternalOnCpuAndClose(numRows, partitionIndexes, partitionColumns) } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRangePartitioner.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRangePartitioner.scala index 99e1e907b8c5..94c303df3cb1 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRangePartitioner.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRangePartitioner.scala @@ -220,19 +220,15 @@ case class GpuRangePartitioner( override def columnarEval(batch: ColumnarBatch): Any = { if (rangeBounds.nonEmpty) { val (parts, partitionColumns) = computeBoundsAndClose(batch) - withResource(partitionColumns) { partitionColumns => - val slicedCb = sliceInternalGpuOrCpu(partitionColumns.head.getRowCount.toInt, - parts, partitionColumns) - slicedCb.zipWithIndex.filter(_._1 != null) - } + val slicedCb = sliceInternalGpuOrCpuAndClose(partitionColumns.head.getRowCount.toInt, + parts, partitionColumns) + slicedCb.zipWithIndex.filter(_._1 != null) } else { - withResource(batch) { cb => - // Nothing needs to be sliced but a contiguous table is needed for GPU shuffle which - // slice will produce. - val sliced = sliceInternalGpuOrCpu(cb.numRows, Array(0), - GpuColumnVector.extractColumns(cb)) - sliced.zipWithIndex.filter(_._1 != null) - } + // Nothing needs to be sliced but a contiguous table is needed for GPU shuffle which + // slice will produce. + val sliced = sliceInternalGpuOrCpuAndClose(batch.numRows, Array(0), + GpuColumnVector.extractColumns(batch)) + sliced.zipWithIndex.filter(_._1 != null) } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRoundRobinPartitioning.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRoundRobinPartitioning.scala index 2f25bf2ec6d8..10def4a63606 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRoundRobinPartitioning.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRoundRobinPartitioning.scala @@ -19,7 +19,6 @@ package com.nvidia.spark.rapids import java.util.Random import ai.rapids.cudf.{NvtxColor, NvtxRange} -import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.shims.ShimExpression import org.apache.spark.TaskContext @@ -78,8 +77,7 @@ case class GpuRoundRobinPartitioning(numPartitions: Int) } } val ret: Array[ColumnarBatch] = - sliceInternalGpuOrCpu(numRows, partitionIndexes, partitionColumns) - partitionColumns.safeClose() + sliceInternalGpuOrCpuAndClose(numRows, partitionIndexes, partitionColumns) // Close the partition columns we copied them as a part of the slice ret.zipWithIndex.filter(_._1 != null) } finally { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSinglePartitioning.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSinglePartitioning.scala index c1422d16072c..3d12eb4b16a4 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSinglePartitioning.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSinglePartitioning.scala @@ -42,15 +42,13 @@ case object GpuSinglePartitioning extends GpuExpression with ShimExpression if (batch.numCols == 0) { Array(batch).zipWithIndex } else { - withResource(batch) { batch => - // Nothing needs to be sliced but a contiguous table is needed for GPU shuffle which - // slice will produce. - val sliced = sliceInternalGpuOrCpu( - batch.numRows, - Array(0), - GpuColumnVector.extractColumns(batch)) - sliced.zipWithIndex.filter(_._1 != null) - } + // Nothing needs to be sliced but a contiguous table is needed for GPU shuffle which + // slice will produce. + val sliced = sliceInternalGpuOrCpuAndClose( + batch.numRows, + Array(0), + GpuColumnVector.extractColumns(batch)) + sliced.zipWithIndex.filter(_._1 != null) } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuPartitioningSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuPartitioningSuite.scala index 713d2542e33b..fc5ccdd9775d 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuPartitioningSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuPartitioningSuite.scala @@ -118,9 +118,13 @@ class GpuPartitioningSuite extends FunSuite with Arm { override val numPartitions: Int = partitionIndices.length } withResource(buildBatch()) { batch => + // `sliceInternalOnGpuAndClose` will close the batch, but in this test we want to + // reuse it + GpuColumnVector.incRefCounts(batch) val columns = GpuColumnVector.extractColumns(batch) val numRows = batch.numRows - withResource(gp.sliceInternalOnGpu(numRows, partitionIndices, columns)) { partitions => + withResource( + gp.sliceInternalOnGpuAndClose(numRows, partitionIndices, columns)) { partitions => partitions.zipWithIndex.foreach { case (partBatch, partIndex) => val startRow = partitionIndices(partIndex) val endRow = if (partIndex < partitionIndices.length - 1) { @@ -153,10 +157,14 @@ class GpuPartitioningSuite extends FunSuite with Arm { override val numPartitions: Int = partitionIndices.length } withResource(buildBatch()) { batch => + // `sliceInternalOnGpuAndClose` will close the batch, but in this test we want to + // reuse it + GpuColumnVector.incRefCounts(batch) val columns = GpuColumnVector.extractColumns(batch) val sparkTypes = GpuColumnVector.extractTypes(batch) val numRows = batch.numRows - withResource(gp.sliceInternalOnGpu(numRows, partitionIndices, columns)) { partitions => + withResource( + gp.sliceInternalOnGpuAndClose(numRows, partitionIndices, columns)) { partitions => partitions.zipWithIndex.foreach { case (partBatch, partIndex) => val startRow = partitionIndices(partIndex) val endRow = if (partIndex < partitionIndices.length - 1) {