Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Register default allocator for host memory #9908

Merged
merged 5 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.mutable.Queue
import ai.rapids.cudf.{Cuda, HostColumnVector, NvtxColor, Table}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.splitSpillableInHalfByRows
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{splitSpillableInHalfByRows, withRetryNoSplit}
import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion
import com.nvidia.spark.rapids.jni.RowConversion
import com.nvidia.spark.rapids.shims.ShimUnaryExecNode
Expand Down Expand Up @@ -258,18 +258,21 @@ class ColumnarToRowIterator(batches: Iterator[ColumnarBatch],
// perform conversion
try {
devCb.foreach { devCb =>
withResource(devCb) { _ =>
withResource(new NvtxWithMetrics("ColumnarToRow: batch", NvtxColor.RED, opTime)) { _ =>
cb = new ColumnarBatch(GpuColumnVector.extractColumns(devCb).safeMap(toHost),
devCb.numRows())
it = cb.rowIterator()
// In order to match the numOutputRows metric in the generated code we update
// numOutputRows for each batch. This is less accurate than doing it at output
// because it will over count the number of rows output in the case of a limit,
// but it is more efficient.
numOutputRows += cb.numRows()
val sDevCb = SpillableColumnarBatch(devCb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)
cb = withRetryNoSplit(sDevCb) { _ =>
withResource(sDevCb.getColumnarBatch()) { devCb =>
withResource(new NvtxWithMetrics("ColumnarToRow: batch", NvtxColor.RED, opTime)) { _ =>
new ColumnarBatch(GpuColumnVector.extractColumns(devCb).safeMap(toHost),
devCb.numRows())
}
}
}
it = cb.rowIterator()
// In order to match the numOutputRows metric in the generated code we update
// numOutputRows for each batch. This is less accurate than doing it at output
// because it will over count the number of rows output in the case of a limit,
// but it is more efficient.
numOutputRows += cb.numRows()
}
} finally {
// Leaving the GPU for a while: if this iterator is configured to release
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,10 +450,8 @@ object GpuDeviceManager extends Logging {
logInfo(s"Initializing pinned memory pool (${pinnedSize / 1024 / 1024.0} MiB)")
PinnedMemoryPool.initialize(pinnedSize, gpuId)
}
if (nonPinnedLimit >= 0) {
// Host memory limits must be set after the pinned memory pool is initialized
HostAlloc.initialize(nonPinnedLimit)
}
// Host memory limits must be set after the pinned memory pool is initialized
HostAlloc.initialize(nonPinnedLimit)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer
import ai.rapids.cudf.{ContiguousTable, Cuda, NvtxColor, NvtxRange, Table}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.withRetryNoSplit

import org.apache.spark.TaskContext
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
Expand Down Expand Up @@ -126,7 +127,9 @@ trait GpuPartitioning extends Partitioning {
val mightNeedToSplit = totalInputSize > GpuPartitioning.MaxCpuBatchSize

val hostPartColumns = withResource(partitionColumns) { _ =>
partitionColumns.map(_.copyToHost())
withRetryNoSplit {
partitionColumns.safeMap(_.copyToHost())
}
}
try {
// Leaving the GPU for a while
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@

package com.nvidia.spark.rapids

import ai.rapids.cudf.{HostMemoryBuffer, MemoryBuffer, PinnedMemoryPool}
import ai.rapids.cudf.{DefaultHostMemoryAllocator, HostMemoryAllocator, HostMemoryBuffer, MemoryBuffer, PinnedMemoryPool}
import com.nvidia.spark.rapids.jni.RmmSpark

import org.apache.spark.internal.Logging

private class HostAlloc(nonPinnedLimit: Long) extends Logging {
private class HostAlloc(nonPinnedLimit: Long) extends HostMemoryAllocator with Logging {
private var currentNonPinnedAllocated: Long = 0L
private val pinnedLimit: Long = PinnedMemoryPool.getTotalPoolSizeBytes
// For now we are going to assume that we are the only ones calling into the pinned pool
Expand Down Expand Up @@ -219,6 +219,12 @@ private class HostAlloc(nonPinnedLimit: Long) extends Logging {
}
ret.get
}

override def allocate(amount: Long, preferPinned: Boolean): HostMemoryBuffer =
alloc(amount, preferPinned)

override def allocate(amount: Long): HostMemoryBuffer =
alloc(amount)
}

/**
Expand All @@ -233,6 +239,7 @@ object HostAlloc {

def initialize(nonPinnedLimit: Long): Unit = synchronized {
singleton = new HostAlloc(nonPinnedLimit)
DefaultHostMemoryAllocator.set(singleton)
}

def tryAlloc(amount: Long, preferPinned: Boolean = true): Option[HostMemoryBuffer] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids

import ai.rapids.cudf.Table
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.jni.{GpuSplitAndRetryOOM, RmmSpark}
import com.nvidia.spark.rapids.jni.{CpuSplitAndRetryOOM, RmmSpark}
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{doAnswer, spy, times, verify}
import org.mockito.invocation.InvocationOnMock
Expand Down Expand Up @@ -97,7 +97,7 @@ class GeneratedInternalRowToCudfRowIteratorRetrySuite
TestUtils.compareBatches(expected, devBatch)
}
}
assertResult(5)(getAndResetNumRetryThrowCurrentTask)
assertResult(6)(getAndResetNumRetryThrowCurrentTask)
assert(!myIter.hasNext)
assertResult(0)(RapidsBufferCatalog.getDeviceStorage.currentSize)
// This is my wrap around of checking that we did retry the last part
Expand Down Expand Up @@ -141,7 +141,7 @@ class GeneratedInternalRowToCudfRowIteratorRetrySuite
TestUtils.compareBatches(expected, devBatch)
}
}
assertResult(5)(getAndResetNumRetryThrowCurrentTask)
assertResult(6)(getAndResetNumRetryThrowCurrentTask)
assert(!myIter.hasNext)
assertResult(0)(RapidsBufferCatalog.getDeviceStorage.currentSize)
// This is my wrap around of checking that we did retry the last part
Expand All @@ -164,7 +164,7 @@ class GeneratedInternalRowToCudfRowIteratorRetrySuite
ctriter, schema, TargetSize(1),
NoopMetric, NoopMetric, NoopMetric, NoopMetric, NoopMetric)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId)
assertThrows[GpuSplitAndRetryOOM] {
assertThrows[CpuSplitAndRetryOOM] {
myIter.next()
}
assertResult(0)(RapidsBufferCatalog.getDeviceStorage.currentSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import com.nvidia.spark.rapids.GpuColumnVector.GpuColumnarBatchBuilder
import org.apache.spark.sql.types.{BinaryType, StringType, StructField, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch

class GpuColumnarToRowSuite extends SparkQueryCompareTestSuite {
class GpuColumnarToRowSuite extends RmmSparkRetrySuiteBase {
test("iterate past empty input batches") {
val batchIter: Iterator[ColumnarBatch] = new Iterator[ColumnarBatch] {
private[this] var batchCount = 0
Expand Down