Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve host memory spill interfaces #10065

Merged
merged 5 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,55 +147,56 @@ public ColumnarBatch next() {
// Update our estimate for number of rows with the final size used to allocate the buffers.
numRowsEstimate = (int) bufsAndNumRows._2.targetSize();
long dataLength = calcDataLengthEstimate(numRowsEstimate);
try (
SpillableHostBuffer sdb = bufsAndNumRows._1[0];
SpillableHostBuffer sob = bufsAndNumRows._1[1];
int used[];
try (SpillableHostBuffer spillableDataBuffer = bufsAndNumRows._1[0];
SpillableHostBuffer spillableOffsetsBuffer = bufsAndNumRows._1[1];
) {
// Fill in buffer under write lock for host buffers
batchAndRange = sdb.withHostBufferWriteLock( (dataBuffer) -> {
return sob.withHostBufferWriteLock( (offsetsBuffer) -> {
int[] used = fillBatch(dataBuffer, offsetsBuffer, dataLength, numRowsEstimate);
int dataOffset = used[0];
int currentRow = used[1];
// We don't want to loop forever trying to copy nothing
assert (currentRow > 0);
if (numInputRows != null) {
numInputRows.add(currentRow);
}
if (numOutputRows != null) {
numOutputRows.add(currentRow);
}
if (numOutputBatches != null) {
numOutputBatches.add(1);
}
// Now that we have filled the buffers with the data, we need to turn them into a
// HostColumnVector and copy them to the device so the GPU can turn it into a Table.
// To do this we first need to make a HostColumnCoreVector for the data, and then
// put that into a HostColumnVector as its child. This the basics of building up
// a column of lists of bytes in CUDF but it is typically hidden behind the higer level
// APIs.
dataBuffer.incRefCount();
offsetsBuffer.incRefCount();
try (HostColumnVectorCore dataCv =
new HostColumnVectorCore(DType.INT8, dataOffset, Optional.of(0L),
dataBuffer, null, null, new ArrayList<>());
HostColumnVector hostColumn = new HostColumnVector(DType.LIST,
currentRow, Optional.of(0L), null, null,
offsetsBuffer, Collections.singletonList(dataCv))) {
HostMemoryBuffer[] hBufs =
getHostBuffersWithRetry(spillableDataBuffer, spillableOffsetsBuffer);
try(HostMemoryBuffer dataBuffer = hBufs[0];
HostMemoryBuffer offsetsBuffer = hBufs[1];
) {
used = fillBatch(dataBuffer, offsetsBuffer, dataLength, numRowsEstimate);
int dataOffset = used[0];
int currentRow = used[1];
// We don't want to loop forever trying to copy nothing
assert (currentRow > 0);
if (numInputRows != null) {
numInputRows.add(currentRow);
}
if (numOutputRows != null) {
numOutputRows.add(currentRow);
}
if (numOutputBatches != null) {
numOutputBatches.add(1);
}
// Now that we have filled the buffers with the data, we need to turn them into a
// HostColumnVector and copy them to the device so the GPU can turn it into a Table.
// To do this we first need to make a HostColumnCoreVector for the data, and then
// put that into a HostColumnVector as its child. This the basics of building up
// a column of lists of bytes in CUDF but it is typically hidden behind the higer level
// APIs.
dataBuffer.incRefCount();
offsetsBuffer.incRefCount();
try (HostColumnVectorCore dataCv =
new HostColumnVectorCore(DType.INT8, dataOffset, Optional.of(0L),
dataBuffer, null, null, new ArrayList<>());
HostColumnVector hostColumn = new HostColumnVector(DType.LIST,
currentRow, Optional.of(0L), null, null,
offsetsBuffer, Collections.singletonList(dataCv))) {

long ct = System.nanoTime() - collectStart;
streamTime.add(ct);
long ct = System.nanoTime() - collectStart;
streamTime.add(ct);

// Grab the semaphore because we are about to put data onto the GPU.
GpuSemaphore$.MODULE$.acquireIfNecessary(TaskContext.get());
NvtxRange range = NvtxWithMetrics.apply("RowToColumnar: build", NvtxColor.GREEN,
Option.apply(opTime));
ColumnVector devColumn =
RmmRapidsRetryIterator.withRetryNoSplit(hostColumn::copyToDevice);
return Tuple2.apply(makeSpillableBatch(devColumn), range);
}
});
});
// Grab the semaphore because we are about to put data onto the GPU.
GpuSemaphore$.MODULE$.acquireIfNecessary(TaskContext.get());
NvtxRange range = NvtxWithMetrics.apply("RowToColumnar: build", NvtxColor.GREEN,
Option.apply(opTime));
ColumnVector devColumn =
RmmRapidsRetryIterator.withRetryNoSplit(hostColumn::copyToDevice);
batchAndRange = Tuple2.apply(makeSpillableBatch(devColumn), range);
}
}
}
try (NvtxRange ignored = batchAndRange._2;
Table tab =
Expand All @@ -208,6 +209,20 @@ public ColumnarBatch next() {
}
}

private HostMemoryBuffer[] getHostBuffersWithRetry(
SpillableHostBuffer spillableDataBuffer, SpillableHostBuffer spillableOffsetsBuffer) {
return RmmRapidsRetryIterator.withRetryNoSplit( () -> {
try (HostMemoryBuffer dataBuffer = spillableDataBuffer.getHostBuffer();
HostMemoryBuffer offsetsBuffer = spillableOffsetsBuffer.getHostBuffer();
) {
// Increment these to keep them.
dataBuffer.incRefCount();
offsetsBuffer.incRefCount();
return new HostMemoryBuffer[] { dataBuffer, offsetsBuffer };
}
});
}

private Tuple2<SpillableHostBuffer[], AutoCloseableTargetSize>
allocBuffers(SpillableHostBuffer[] sBufs, AutoCloseableTargetSize numRowsWrapper) {
HostMemoryBuffer[] hBufs = new HostMemoryBuffer[]{ null, null };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.nio.channels.WritableByteChannel

import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, MemoryBuffer, Table}
import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, HostMemoryBuffer, MemoryBuffer, Table}
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.StorageTier.StorageTier
Expand Down Expand Up @@ -320,6 +320,15 @@ trait RapidsBuffer extends AutoCloseable {
*/
def getDeviceMemoryBuffer: DeviceMemoryBuffer

/**
* Get the host memory buffer from the underlying storage. If the buffer currently resides
* outside of host memory, a new HostMemoryBuffer is created with the data copied over.
* The caller must have successfully acquired the buffer beforehand.
* @see [[addReference]]
* @note It is the responsibility of the caller to close the buffer.
*/
def getHostMemoryBuffer: HostMemoryBuffer

/**
* Try to add a reference to this buffer to acquire it.
* @note The close method must be called for every successfully obtained reference.
Expand Down Expand Up @@ -425,6 +434,9 @@ sealed class DegenerateRapidsBuffer(
override def getDeviceMemoryBuffer: DeviceMemoryBuffer =
throw new UnsupportedOperationException("degenerate buffer has no device memory buffer")

override def getHostMemoryBuffer: HostMemoryBuffer =
throw new UnsupportedOperationException("degenerate buffer has no host memory buffer")

override def addReference(): Boolean = true

override def getSpillPriority: Long = Long.MaxValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -613,14 +613,22 @@ class RapidsBufferCatalog(
}
}

def updateTiers(bufferSpill: BufferSpill): Long = bufferSpill match {
def updateTiers(bufferSpill: SpillAction): Long = bufferSpill match {
case BufferSpill(spilledBuffer, maybeNewBuffer) =>
logDebug(s"Spilled ${spilledBuffer.id} from tier ${spilledBuffer.storageTier}. " +
s"Removing. Registering ${maybeNewBuffer.map(_.id).getOrElse ("None")} " +
s"${maybeNewBuffer}")
maybeNewBuffer.foreach(registerNewBuffer)
removeBufferTier(spilledBuffer.id, spilledBuffer.storageTier)
spilledBuffer.memoryUsedBytes

case BufferUnspill(unspilledBuffer, maybeNewBuffer) =>
logDebug(s"Unspilled ${unspilledBuffer.id} from tier ${unspilledBuffer.storageTier}. " +
s"Removing. Registering ${maybeNewBuffer.map(_.id).getOrElse ("None")} " +
s"${maybeNewBuffer}")
maybeNewBuffer.foreach(registerNewBuffer)
removeBufferTier(unspilledBuffer.id, unspilledBuffer.storageTier)
unspilledBuffer.memoryUsedBytes
}

/**
Expand All @@ -647,6 +655,34 @@ class RapidsBufferCatalog(
}
}

/**
* Copies `buffer` to the `hostStorage` store, registering a new `RapidsBuffer` in
* the process
*
* @param buffer - buffer to copy
* @param stream - Cuda.Stream to synchronize on
* @return - The `RapidsBuffer` instance that was added to the host store.
*/
def unspillBufferToHostStore(
buffer: RapidsBuffer,
stream: Cuda.Stream): RapidsBuffer = synchronized {
// try to acquire the buffer, if it's already in the store
// do not create a new one, else add a reference
acquireBuffer(buffer.id, StorageTier.HOST) match {
case Some(existingBuffer) => existingBuffer
case None =>
val maybeNewBuffer = hostStorage.copyBuffer(buffer, this, stream)
maybeNewBuffer.map { newBuffer =>
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This buffer is initially created as spillable. But it changes to unspillable when the caller does a getHostMemoryBuffer on it. Not sure if I should be concerned about this brief window of spillability?

logDebug(s"got new RapidsHostMemoryStore buffer ${newBuffer.id}")
newBuffer.addReference() // add a reference since we are about to use it
updateTiers(BufferUnspill(buffer, Some(newBuffer)))
buffer.safeFree()
newBuffer
}.get // the host store has to return a buffer here for now, or throw OOM
}
}


/**
* Remove a buffer ID from the catalog at the specified storage tier.
* @note public for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,30 @@ import scala.collection.mutable
import ai.rapids.cudf.{BaseDeviceMemoryBuffer, Cuda, DeviceMemoryBuffer, HostMemoryBuffer, MemoryBuffer, NvtxColor, NvtxRange}
import com.nvidia.spark.rapids.Arm._
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.StorageTier.{DEVICE, StorageTier}
import com.nvidia.spark.rapids.StorageTier.{DEVICE, HOST, StorageTier}
import com.nvidia.spark.rapids.format.TableMeta

import org.apache.spark.internal.Logging
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
* A helper case class that contains the buffer we spilled from our current tier
* and likely a new buffer created in a spill store tier, but it can be set to None.
* If the buffer already exists in the target spill store, `newBuffer` will be None.
* @param spilledBuffer a `RapidsBuffer` we spilled from this store
* @param newBuffer an optional `RapidsBuffer` in the target spill store.
* Helper case classes that contain the buffer we spilled or unspilled from our current tier
* and likely a new buffer created in a target store tier, but it can be set to None.
* If the buffer already exists in the target store, `newBuffer` will be None.
* @param spillBuffer a `RapidsBuffer` we spilled or unspilled from this store
* @param newBuffer an optional `RapidsBuffer` in the target store.
*/
case class BufferSpill(spilledBuffer: RapidsBuffer, newBuffer: Option[RapidsBuffer])
trait SpillAction {
val spillBuffer: RapidsBuffer
val newBuffer: Option[RapidsBuffer]
}

case class BufferSpill(spillBuffer: RapidsBuffer, newBuffer: Option[RapidsBuffer])
extends SpillAction

case class BufferUnspill(spillBuffer: RapidsBuffer, newBuffer: Option[RapidsBuffer])
extends SpillAction

/**
* Base class for all buffer store types.
Expand Down Expand Up @@ -307,7 +316,7 @@ abstract class RapidsBufferStore(val tier: StorageTier)
// as it has already spilled.
BufferSpill(nextSpillableBuffer, None)
}
totalSpilled += bufferSpill.spilledBuffer.memoryUsedBytes
totalSpilled += bufferSpill.spillBuffer.memoryUsedBytes
bufferSpills.append(bufferSpill)
catalog.updateTiers(bufferSpill)
}
Expand All @@ -333,7 +342,7 @@ abstract class RapidsBufferStore(val tier: StorageTier)
// the buffer via events.
// https://github.com/NVIDIA/spark-rapids/issues/8610
Cuda.deviceSynchronize()
bufferSpills.foreach(_.spilledBuffer.safeFree())
bufferSpills.foreach(_.spillBuffer.safeFree())
}
}
}
Expand Down Expand Up @@ -516,6 +525,31 @@ abstract class RapidsBufferStore(val tier: StorageTier)
}
}

override def getHostMemoryBuffer: HostMemoryBuffer = {
(0 until MAX_UNSPILL_ATTEMPTS).foreach { _ =>
catalog.acquireBuffer(id, HOST) match {
case Some(buffer) =>
withResource(buffer) { _ =>
return buffer.getHostMemoryBuffer
}
case _ =>
try {
logDebug(s"Unspilling $this $id to $HOST")
val newBuffer = catalog.unspillBufferToHostStore(
this,
Cuda.DEFAULT_STREAM)
withResource(newBuffer) { _ =>
return newBuffer.getHostMemoryBuffer
}
} catch {
case _: DuplicateBufferException =>
logDebug(s"Lost host buffer registration race for buffer $id, retrying...")
}
}
}
throw new IllegalStateException(s"Unable to get host memory buffer for ID: $id")
}

/**
* close() is called by client code to decrease the ref count of this RapidsBufferBase.
* In the off chance that by the time close is invoked, the buffer was freed (not valid)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,62 +140,47 @@ class RapidsDiskStore(diskBlockManager: RapidsDiskBlockManager)
meta: TableMeta,
spillPriority: Long)
extends RapidsBufferBase(id, meta, spillPriority) {
private[this] var hostBuffer: Option[HostMemoryBuffer] = None

// FIXME: Need to be clean up. Tracked in https://github.com/NVIDIA/spark-rapids/issues/9496
override val memoryUsedBytes: Long = uncompressedSize

override val storageTier: StorageTier = StorageTier.DISK

override def getMemoryBuffer: MemoryBuffer = synchronized {
if (hostBuffer.isEmpty) {
require(onDiskSizeInBytes > 0,
s"$this attempted an invalid 0-byte mmap of a file")
val path = id.getDiskPath(diskBlockManager)
val serializerManager = diskBlockManager.getSerializerManager()
val memBuffer = if (serializerManager.isRapidsSpill(id)) {
// Only go through serializerManager's stream wrapper for spill case
closeOnExcept(HostMemoryBuffer.allocate(uncompressedSize)) { decompressed =>
GpuTaskMetrics.get.readSpillFromDiskTime {
withResource(FileChannel.open(path.toPath, StandardOpenOption.READ)) { c =>
c.position(fileOffset)
withResource(Channels.newInputStream(c)) { compressed =>
withResource(serializerManager.wrapStream(id, compressed)) { in =>
withResource(new HostMemoryOutputStream(decompressed)) { out =>
IOUtils.copy(in, out)
}
decompressed
require(onDiskSizeInBytes > 0,
s"$this attempted an invalid 0-byte mmap of a file")
val path = id.getDiskPath(diskBlockManager)
val serializerManager = diskBlockManager.getSerializerManager()
val memBuffer = if (serializerManager.isRapidsSpill(id)) {
// Only go through serializerManager's stream wrapper for spill case
closeOnExcept(HostAlloc.alloc(uncompressedSize)) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding this HostAlloc.alloc() is what really increases how much heap memory I can get away with when running queries. This also requires that any code that ends up in here will likely need a retry.

decompressed => GpuTaskMetrics.get.readSpillFromDiskTime {
withResource(FileChannel.open(path.toPath, StandardOpenOption.READ)) { c =>
c.position(fileOffset)
withResource(Channels.newInputStream(c)) { compressed =>
withResource(serializerManager.wrapStream(id, compressed)) { in =>
withResource(new HostMemoryOutputStream(decompressed)) { out =>
IOUtils.copy(in, out)
}
decompressed
}
}
}
}
} else {
// Reserved mmap read fashion for UCX shuffle path. Also it's skipping encryption and
// compression.
HostMemoryBuffer.mapFile(path, MapMode.READ_WRITE, fileOffset, onDiskSizeInBytes)
}
hostBuffer = Some(memBuffer)
} else {
// Reserved mmap read fashion for UCX shuffle path. Also it's skipping encryption and
// compression.
HostMemoryBuffer.mapFile(path, MapMode.READ_WRITE, fileOffset, onDiskSizeInBytes)
}
hostBuffer.foreach(_.incRefCount())
hostBuffer.get
memBuffer
}

override def close(): Unit = synchronized {
if (refcount == 1) {
// free the memory mapping since this is the last active reader
hostBuffer.foreach { b =>
logDebug(s"closing mmap buffer $b")
b.close()
}
hostBuffer = None
}
super.close()
}

override protected def releaseResources(): Unit = {
require(hostBuffer.isEmpty,
"Releasing a disk buffer with non-empty host buffer")
// Buffers that share paths must be cleaned up elsewhere
if (id.canShareDiskPaths) {
sharedBufferFiles.remove(id)
Expand Down
Loading