Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Register default allocator for host memory #9908

Merged
merged 5 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.mutable.Queue
import ai.rapids.cudf.{Cuda, HostColumnVector, NvtxColor, Table}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.splitSpillableInHalfByRows
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{splitSpillableInHalfByRows, withRetryNoSplit}
import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion
import com.nvidia.spark.rapids.jni.RowConversion
import com.nvidia.spark.rapids.shims.ShimUnaryExecNode
Expand Down Expand Up @@ -258,18 +258,21 @@ class ColumnarToRowIterator(batches: Iterator[ColumnarBatch],
// perform conversion
try {
devCb.foreach { devCb =>
withResource(devCb) { _ =>
withResource(new NvtxWithMetrics("ColumnarToRow: batch", NvtxColor.RED, opTime)) { _ =>
cb = new ColumnarBatch(GpuColumnVector.extractColumns(devCb).safeMap(toHost),
devCb.numRows())
it = cb.rowIterator()
// In order to match the numOutputRows metric in the generated code we update
// numOutputRows for each batch. This is less accurate than doing it at output
// because it will over count the number of rows output in the case of a limit,
// but it is more efficient.
numOutputRows += cb.numRows()
val sDevCb = SpillableColumnarBatch(devCb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)
cb = withRetryNoSplit(sDevCb) { _ =>
withResource(sDevCb.getColumnarBatch()) { devCb =>
withResource(new NvtxWithMetrics("ColumnarToRow: batch", NvtxColor.RED, opTime)) { _ =>
new ColumnarBatch(GpuColumnVector.extractColumns(devCb).safeMap(toHost),
devCb.numRows())
}
}
}
it = cb.rowIterator()
// In order to match the numOutputRows metric in the generated code we update
// numOutputRows for each batch. This is less accurate than doing it at output
// because it will over count the number of rows output in the case of a limit,
// but it is more efficient.
numOutputRows += cb.numRows()
}
} finally {
// Leaving the GPU for a while: if this iterator is configured to release
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,10 +450,8 @@ object GpuDeviceManager extends Logging {
logInfo(s"Initializing pinned memory pool (${pinnedSize / 1024 / 1024.0} MiB)")
PinnedMemoryPool.initialize(pinnedSize, gpuId)
}
if (nonPinnedLimit >= 0) {
// Host memory limits must be set after the pinned memory pool is initialized
HostAlloc.initialize(nonPinnedLimit)
}
// Host memory limits must be set after the pinned memory pool is initialized
HostAlloc.initialize(nonPinnedLimit)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer
import ai.rapids.cudf.{ContiguousTable, Cuda, NvtxColor, NvtxRange, Table}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.withRetryNoSplit

import org.apache.spark.TaskContext
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
Expand Down Expand Up @@ -126,7 +127,9 @@ trait GpuPartitioning extends Partitioning {
val mightNeedToSplit = totalInputSize > GpuPartitioning.MaxCpuBatchSize

val hostPartColumns = withResource(partitionColumns) { _ =>
partitionColumns.map(_.copyToHost())
withRetryNoSplit {
partitionColumns.safeMap(_.copyToHost())
}
}
try {
// Leaving the GPU for a while
Expand Down
25 changes: 20 additions & 5 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostAlloc.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@

package com.nvidia.spark.rapids

import ai.rapids.cudf.{HostMemoryBuffer, MemoryBuffer, PinnedMemoryPool}
import com.nvidia.spark.rapids.jni.RmmSpark
import ai.rapids.cudf.{DefaultHostMemoryAllocator, HostMemoryAllocator, HostMemoryBuffer, MemoryBuffer, PinnedMemoryPool}
import com.nvidia.spark.rapids.jni.{CpuRetryOOM, RmmSpark}

import org.apache.spark.internal.Logging

private class HostAlloc(nonPinnedLimit: Long) extends Logging {
private class HostAlloc(nonPinnedLimit: Long) extends HostMemoryAllocator with Logging {
private var currentNonPinnedAllocated: Long = 0L
private val pinnedLimit: Long = PinnedMemoryPool.getTotalPoolSizeBytes
// For now we are going to assume that we are the only ones calling into the pinned pool
Expand Down Expand Up @@ -137,7 +137,7 @@ private class HostAlloc(nonPinnedLimit: Long) extends Logging {
s"$storeSize total and $storeSpillableSize spillable bytes. $attemptMsg.")
if (storeSpillableSize == 0) {
logWarning(s"Host store exhausted, unable to allocate $allocSize bytes. " +
s"Total RMM allocated is $totalSize bytes.")
s"Total host allocated is $totalSize bytes.")
false
} else {
val targetSize = Math.max(storeSpillableSize - allocSize, 0)
Expand Down Expand Up @@ -213,12 +213,26 @@ private class HostAlloc(nonPinnedLimit: Long) extends Logging {
def alloc(amount: Long, preferPinned: Boolean = true): HostMemoryBuffer = {
checkSize(amount, preferPinned)
var ret = Option.empty[HostMemoryBuffer]
while (ret.isEmpty) {
var count = 0
while (ret.isEmpty && count < 1000) {
val (r, _) = tryAllocInternal(amount, preferPinned, blocking = true)
ret = r
count += 1
}
if (ret.isEmpty) {
// This can happen if someone broke the rules and not all host memory is
// spillable when doing an allocation, like if not all of the code has
// been updated yet.
throw new CpuRetryOOM("Could not complete allocation after 1000 retries")
}
ret.get
}

override def allocate(amount: Long, preferPinned: Boolean): HostMemoryBuffer =
alloc(amount, preferPinned)

override def allocate(amount: Long): HostMemoryBuffer =
alloc(amount)
}

/**
Expand All @@ -233,6 +247,7 @@ object HostAlloc {

def initialize(nonPinnedLimit: Long): Unit = synchronized {
singleton = new HostAlloc(nonPinnedLimit)
DefaultHostMemoryAllocator.set(singleton)
}

def tryAlloc(amount: Long, preferPinned: Boolean = true): Option[HostMemoryBuffer] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids

import ai.rapids.cudf.Table
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.jni.{GpuSplitAndRetryOOM, RmmSpark}
import com.nvidia.spark.rapids.jni.{CpuSplitAndRetryOOM, RmmSpark}
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{doAnswer, spy, times, verify}
import org.mockito.invocation.InvocationOnMock
Expand Down Expand Up @@ -97,7 +97,7 @@ class GeneratedInternalRowToCudfRowIteratorRetrySuite
TestUtils.compareBatches(expected, devBatch)
}
}
assertResult(5)(getAndResetNumRetryThrowCurrentTask)
assertResult(6)(getAndResetNumRetryThrowCurrentTask)
assert(!myIter.hasNext)
assertResult(0)(RapidsBufferCatalog.getDeviceStorage.currentSize)
// This is my wrap around of checking that we did retry the last part
Expand Down Expand Up @@ -141,7 +141,7 @@ class GeneratedInternalRowToCudfRowIteratorRetrySuite
TestUtils.compareBatches(expected, devBatch)
}
}
assertResult(5)(getAndResetNumRetryThrowCurrentTask)
assertResult(6)(getAndResetNumRetryThrowCurrentTask)
assert(!myIter.hasNext)
assertResult(0)(RapidsBufferCatalog.getDeviceStorage.currentSize)
// This is my wrap around of checking that we did retry the last part
Expand All @@ -164,7 +164,7 @@ class GeneratedInternalRowToCudfRowIteratorRetrySuite
ctriter, schema, TargetSize(1),
NoopMetric, NoopMetric, NoopMetric, NoopMetric, NoopMetric)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId)
assertThrows[GpuSplitAndRetryOOM] {
assertThrows[CpuSplitAndRetryOOM] {
myIter.next()
}
assertResult(0)(RapidsBufferCatalog.getDeviceStorage.currentSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import com.nvidia.spark.rapids.GpuColumnVector.GpuColumnarBatchBuilder
import org.apache.spark.sql.types.{BinaryType, StringType, StructField, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch

class GpuColumnarToRowSuite extends SparkQueryCompareTestSuite {
class GpuColumnarToRowSuite extends RmmSparkRetrySuiteBase {
test("iterate past empty input batches") {
val batchIter: Iterator[ColumnarBatch] = new Iterator[ColumnarBatch] {
private[this] var batchCount = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import org.scalatest.funsuite.AnyFunSuite

import org.apache.spark.sql.SparkSession

class RmmSparkRetrySuiteBase extends AnyFunSuite with BeforeAndAfterEach {
trait RmmSparkRetrySuiteBase extends AnyFunSuite with BeforeAndAfterEach {
private var rmmWasInitialized = false
protected var deviceStorage: RapidsDeviceMemoryStore = null
protected var deviceStorage: RapidsDeviceMemoryStore = _

override def beforeEach(): Unit = {
super.beforeEach()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,19 @@ package org.apache.spark.sql.rapids.execution

import scala.collection.mutable

import com.nvidia.spark.rapids.{ColumnarRdd, ColumnarToRowIterator, GpuBatchUtilsSuite, GpuColumnVectorUtils, NoopMetric, RapidsHostColumnVector, SparkQueryCompareTestSuite, TestResourceFinder}
import com.nvidia.spark.rapids.{ColumnarRdd, ColumnarToRowIterator, GpuBatchUtilsSuite, GpuColumnVectorUtils, NoopMetric, RapidsHostColumnVector, RmmSparkRetrySuiteBase, SparkQueryCompareTestSuite, TestResourceFinder}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.GpuColumnVector.GpuColumnarBatchBuilder
import org.scalatest.Assertion

import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.util.MapData
import org.apache.spark.sql.types._

class InternalColumnarRDDConverterSuite extends SparkQueryCompareTestSuite {
class InternalColumnarRDDConverterSuite extends SparkQueryCompareTestSuite
with RmmSparkRetrySuiteBase {

def compareMapAndMapDate[K,V](map: collection.Map[K, V], mapData: MapData) = {
def compareMapAndMapDate[K,V](map: collection.Map[K, V], mapData: MapData): Assertion = {
assert(map.size == mapData.numElements())
val outputMap = mutable.Map[Any, Any]()
// Only String now, TODO: support other data types in Map
Expand All @@ -46,9 +48,9 @@ class InternalColumnarRDDConverterSuite extends SparkQueryCompareTestSuite {
withResource(new GpuColumnarBatchBuilder(schema, numRows)) { batchBuilder =>
val extR2CConverter = new GpuExternalRowToColumnConverter(schema)
rows.foreach(extR2CConverter.convert(_, batchBuilder))
closeOnExcept(batchBuilder.build(numRows)) { columnarBatch =>
val c2rIterator = new ColumnarToRowIterator(Iterator(columnarBatch),
NoopMetric, NoopMetric, NoopMetric, NoopMetric)
val columnarBatch = batchBuilder.build(numRows)
withResource(new ColumnarToRowIterator(Iterator(columnarBatch),
NoopMetric, NoopMetric, NoopMetric, NoopMetric)) { c2rIterator =>
rows.foreach { input =>
val output = c2rIterator.next()
if (input.isNullAt(0)) {
Expand Down