Skip to content

Commit

Permalink
Register default allocator for host memory (#9908)
Browse files Browse the repository at this point in the history
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
  • Loading branch information
revans2 authored Dec 4, 2023
1 parent 02e7d40 commit 410647c
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.mutable.Queue
import ai.rapids.cudf.{Cuda, HostColumnVector, NvtxColor, Table}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.splitSpillableInHalfByRows
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{splitSpillableInHalfByRows, withRetryNoSplit}
import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion
import com.nvidia.spark.rapids.jni.RowConversion
import com.nvidia.spark.rapids.shims.ShimUnaryExecNode
Expand Down Expand Up @@ -258,18 +258,21 @@ class ColumnarToRowIterator(batches: Iterator[ColumnarBatch],
// perform conversion
try {
devCb.foreach { devCb =>
withResource(devCb) { _ =>
withResource(new NvtxWithMetrics("ColumnarToRow: batch", NvtxColor.RED, opTime)) { _ =>
cb = new ColumnarBatch(GpuColumnVector.extractColumns(devCb).safeMap(toHost),
devCb.numRows())
it = cb.rowIterator()
// In order to match the numOutputRows metric in the generated code we update
// numOutputRows for each batch. This is less accurate than doing it at output
// because it will over count the number of rows output in the case of a limit,
// but it is more efficient.
numOutputRows += cb.numRows()
val sDevCb = SpillableColumnarBatch(devCb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)
cb = withRetryNoSplit(sDevCb) { _ =>
withResource(sDevCb.getColumnarBatch()) { devCb =>
withResource(new NvtxWithMetrics("ColumnarToRow: batch", NvtxColor.RED, opTime)) { _ =>
new ColumnarBatch(GpuColumnVector.extractColumns(devCb).safeMap(toHost),
devCb.numRows())
}
}
}
it = cb.rowIterator()
// In order to match the numOutputRows metric in the generated code we update
// numOutputRows for each batch. This is less accurate than doing it at output
// because it will over count the number of rows output in the case of a limit,
// but it is more efficient.
numOutputRows += cb.numRows()
}
} finally {
// Leaving the GPU for a while: if this iterator is configured to release
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,10 +450,8 @@ object GpuDeviceManager extends Logging {
logInfo(s"Initializing pinned memory pool (${pinnedSize / 1024 / 1024.0} MiB)")
PinnedMemoryPool.initialize(pinnedSize, gpuId)
}
if (nonPinnedLimit >= 0) {
// Host memory limits must be set after the pinned memory pool is initialized
HostAlloc.initialize(nonPinnedLimit)
}
// Host memory limits must be set after the pinned memory pool is initialized
HostAlloc.initialize(nonPinnedLimit)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer
import ai.rapids.cudf.{ContiguousTable, Cuda, NvtxColor, NvtxRange, Table}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.withRetryNoSplit

import org.apache.spark.TaskContext
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
Expand Down Expand Up @@ -126,7 +127,9 @@ trait GpuPartitioning extends Partitioning {
val mightNeedToSplit = totalInputSize > GpuPartitioning.MaxCpuBatchSize

val hostPartColumns = withResource(partitionColumns) { _ =>
partitionColumns.map(_.copyToHost())
withRetryNoSplit {
partitionColumns.safeMap(_.copyToHost())
}
}
try {
// Leaving the GPU for a while
Expand Down
25 changes: 20 additions & 5 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostAlloc.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@

package com.nvidia.spark.rapids

import ai.rapids.cudf.{HostMemoryBuffer, MemoryBuffer, PinnedMemoryPool}
import com.nvidia.spark.rapids.jni.RmmSpark
import ai.rapids.cudf.{DefaultHostMemoryAllocator, HostMemoryAllocator, HostMemoryBuffer, MemoryBuffer, PinnedMemoryPool}
import com.nvidia.spark.rapids.jni.{CpuRetryOOM, RmmSpark}

import org.apache.spark.internal.Logging

private class HostAlloc(nonPinnedLimit: Long) extends Logging {
private class HostAlloc(nonPinnedLimit: Long) extends HostMemoryAllocator with Logging {
private var currentNonPinnedAllocated: Long = 0L
private val pinnedLimit: Long = PinnedMemoryPool.getTotalPoolSizeBytes
// For now we are going to assume that we are the only ones calling into the pinned pool
Expand Down Expand Up @@ -137,7 +137,7 @@ private class HostAlloc(nonPinnedLimit: Long) extends Logging {
s"$storeSize total and $storeSpillableSize spillable bytes. $attemptMsg.")
if (storeSpillableSize == 0) {
logWarning(s"Host store exhausted, unable to allocate $allocSize bytes. " +
s"Total RMM allocated is $totalSize bytes.")
s"Total host allocated is $totalSize bytes.")
false
} else {
val targetSize = Math.max(storeSpillableSize - allocSize, 0)
Expand Down Expand Up @@ -213,12 +213,26 @@ private class HostAlloc(nonPinnedLimit: Long) extends Logging {
def alloc(amount: Long, preferPinned: Boolean = true): HostMemoryBuffer = {
checkSize(amount, preferPinned)
var ret = Option.empty[HostMemoryBuffer]
while (ret.isEmpty) {
var count = 0
while (ret.isEmpty && count < 1000) {
val (r, _) = tryAllocInternal(amount, preferPinned, blocking = true)
ret = r
count += 1
}
if (ret.isEmpty) {
// This can happen if someone broke the rules and not all host memory is
// spillable when doing an allocation, like if not all of the code has
// been updated yet.
throw new CpuRetryOOM("Could not complete allocation after 1000 retries")
}
ret.get
}

override def allocate(amount: Long, preferPinned: Boolean): HostMemoryBuffer =
alloc(amount, preferPinned)

override def allocate(amount: Long): HostMemoryBuffer =
alloc(amount)
}

/**
Expand All @@ -233,6 +247,7 @@ object HostAlloc {

def initialize(nonPinnedLimit: Long): Unit = synchronized {
singleton = new HostAlloc(nonPinnedLimit)
DefaultHostMemoryAllocator.set(singleton)
}

def tryAlloc(amount: Long, preferPinned: Boolean = true): Option[HostMemoryBuffer] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids

import ai.rapids.cudf.Table
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.jni.{GpuSplitAndRetryOOM, RmmSpark}
import com.nvidia.spark.rapids.jni.{CpuSplitAndRetryOOM, RmmSpark}
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{doAnswer, spy, times, verify}
import org.mockito.invocation.InvocationOnMock
Expand Down Expand Up @@ -97,7 +97,7 @@ class GeneratedInternalRowToCudfRowIteratorRetrySuite
TestUtils.compareBatches(expected, devBatch)
}
}
assertResult(5)(getAndResetNumRetryThrowCurrentTask)
assertResult(6)(getAndResetNumRetryThrowCurrentTask)
assert(!myIter.hasNext)
assertResult(0)(RapidsBufferCatalog.getDeviceStorage.currentSize)
// This is my wrap around of checking that we did retry the last part
Expand Down Expand Up @@ -141,7 +141,7 @@ class GeneratedInternalRowToCudfRowIteratorRetrySuite
TestUtils.compareBatches(expected, devBatch)
}
}
assertResult(5)(getAndResetNumRetryThrowCurrentTask)
assertResult(6)(getAndResetNumRetryThrowCurrentTask)
assert(!myIter.hasNext)
assertResult(0)(RapidsBufferCatalog.getDeviceStorage.currentSize)
// This is my wrap around of checking that we did retry the last part
Expand All @@ -164,7 +164,7 @@ class GeneratedInternalRowToCudfRowIteratorRetrySuite
ctriter, schema, TargetSize(1),
NoopMetric, NoopMetric, NoopMetric, NoopMetric, NoopMetric)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId)
assertThrows[GpuSplitAndRetryOOM] {
assertThrows[CpuSplitAndRetryOOM] {
myIter.next()
}
assertResult(0)(RapidsBufferCatalog.getDeviceStorage.currentSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import com.nvidia.spark.rapids.GpuColumnVector.GpuColumnarBatchBuilder
import org.apache.spark.sql.types.{BinaryType, StringType, StructField, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch

class GpuColumnarToRowSuite extends SparkQueryCompareTestSuite {
class GpuColumnarToRowSuite extends RmmSparkRetrySuiteBase {
test("iterate past empty input batches") {
val batchIter: Iterator[ColumnarBatch] = new Iterator[ColumnarBatch] {
private[this] var batchCount = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import org.scalatest.funsuite.AnyFunSuite

import org.apache.spark.sql.SparkSession

class RmmSparkRetrySuiteBase extends AnyFunSuite with BeforeAndAfterEach {
trait RmmSparkRetrySuiteBase extends AnyFunSuite with BeforeAndAfterEach {
private var rmmWasInitialized = false
protected var deviceStorage: RapidsDeviceMemoryStore = null
protected var deviceStorage: RapidsDeviceMemoryStore = _

override def beforeEach(): Unit = {
super.beforeEach()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,19 @@ package org.apache.spark.sql.rapids.execution

import scala.collection.mutable

import com.nvidia.spark.rapids.{ColumnarRdd, ColumnarToRowIterator, GpuBatchUtilsSuite, GpuColumnVectorUtils, NoopMetric, RapidsHostColumnVector, SparkQueryCompareTestSuite, TestResourceFinder}
import com.nvidia.spark.rapids.{ColumnarRdd, ColumnarToRowIterator, GpuBatchUtilsSuite, GpuColumnVectorUtils, NoopMetric, RapidsHostColumnVector, RmmSparkRetrySuiteBase, SparkQueryCompareTestSuite, TestResourceFinder}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.GpuColumnVector.GpuColumnarBatchBuilder
import org.scalatest.Assertion

import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.util.MapData
import org.apache.spark.sql.types._

class InternalColumnarRDDConverterSuite extends SparkQueryCompareTestSuite {
class InternalColumnarRDDConverterSuite extends SparkQueryCompareTestSuite
with RmmSparkRetrySuiteBase {

def compareMapAndMapDate[K,V](map: collection.Map[K, V], mapData: MapData) = {
def compareMapAndMapDate[K,V](map: collection.Map[K, V], mapData: MapData): Assertion = {
assert(map.size == mapData.numElements())
val outputMap = mutable.Map[Any, Any]()
// Only String now, TODO: support other data types in Map
Expand All @@ -46,9 +48,9 @@ class InternalColumnarRDDConverterSuite extends SparkQueryCompareTestSuite {
withResource(new GpuColumnarBatchBuilder(schema, numRows)) { batchBuilder =>
val extR2CConverter = new GpuExternalRowToColumnConverter(schema)
rows.foreach(extR2CConverter.convert(_, batchBuilder))
closeOnExcept(batchBuilder.build(numRows)) { columnarBatch =>
val c2rIterator = new ColumnarToRowIterator(Iterator(columnarBatch),
NoopMetric, NoopMetric, NoopMetric, NoopMetric)
val columnarBatch = batchBuilder.build(numRows)
withResource(new ColumnarToRowIterator(Iterator(columnarBatch),
NoopMetric, NoopMetric, NoopMetric, NoopMetric)) { c2rIterator =>
rows.foreach { input =>
val output = c2rIterator.next()
if (input.isNullAt(0)) {
Expand Down

0 comments on commit 410647c

Please sign in to comment.