Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cpu oom retry split handling to InternalRowToColumnarBatchIterator #10011

Merged
merged 4 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.NoSuchElementException;
import java.util.Optional;

import com.nvidia.spark.Retryable;
import scala.Option;
import scala.Tuple2;
import scala.collection.Iterator;
Expand Down Expand Up @@ -54,8 +55,8 @@
public abstract class InternalRowToColumnarBatchIterator implements Iterator<ColumnarBatch> {
protected final Iterator<InternalRow> input;
protected UnsafeRow pending = null;
protected final int numRowsEstimate;
protected final long dataLength;
protected int numRowsEstimate = 1;
protected final int sizePerRowEstimate;
protected final DType[] rapidsTypes;
protected final DataType[] outputTypes;
protected final GpuMetric streamTime;
Expand All @@ -74,10 +75,8 @@ protected InternalRowToColumnarBatchIterator(
GpuMetric numOutputRows,
GpuMetric numOutputBatches) {
this.input = input;
int sizePerRowEstimate = CudfUnsafeRow.getRowSizeEstimate(schema);
numRowsEstimate = (int)Math.max(1,
Math.min(Integer.MAX_VALUE - 1, goal.targetSizeBytes() / sizePerRowEstimate));
dataLength = ((long) sizePerRowEstimate) * numRowsEstimate;
sizePerRowEstimate = CudfUnsafeRow.getRowSizeEstimate(schema);
numRowsEstimate = calcNumRowsEstimate(goal.targetSizeBytes());
rapidsTypes = new DType[schema.length];
outputTypes = new DataType[schema.length];

Expand All @@ -92,6 +91,20 @@ protected InternalRowToColumnarBatchIterator(
this.numOutputBatches = numOutputBatches;
}

private int calcNumRowsEstimate(long targetBytes) {
return Math.max(1,
Math.min(Integer.MAX_VALUE - 1, (int) (targetBytes / sizePerRowEstimate)));
}

private long calcDataLengthEstimate(int numRows) {
gerashegalov marked this conversation as resolved.
Show resolved Hide resolved
return ((long) sizePerRowEstimate) * numRows;
}

private long calcOffsetLengthEstimate(int numRows) {
int BYTES_PER_OFFSET = DType.INT32.getSizeInBytes();
return (long)(numRows + 1) * BYTES_PER_OFFSET;
}

@Override
public boolean hasNext() {
boolean ret = true;
Expand All @@ -109,87 +122,80 @@ public ColumnarBatch next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
final int BYTES_PER_OFFSET = DType.INT32.getSizeInBytes();

long collectStart = System.nanoTime();

long collectStart = System.nanoTime();
Tuple2<SpillableColumnarBatch, NvtxRange> batchAndRange;
AutoCloseableTargetSize numRowsWrapper =
new AutoCloseableTargetSize(numRowsEstimate, 1);
Tuple2<SpillableHostBuffer[], AutoCloseableTargetSize> bufsAndNumRows;

// The row formatted data is stored as a column of lists of bytes. The current java CUDF APIs
// don't do a great job from a performance standpoint with building this type of data structure
// and we want this to be as efficient as possible so we are going to allocate two host memory
// buffers. One will be for the byte data and the second will be for the offsets. We will then
// write the data directly into those buffers using code generation in a child of this class.
// that implements fillBatch.
HostMemoryBuffer db =
RmmRapidsRetryIterator.withRetryNoSplit( () -> {
return HostAlloc$.MODULE$.alloc(dataLength, true);
});
bufsAndNumRows =
// Starting with initial num rows estimate, this retry block will recalculate the buffer
// sizes from the rows estimate, which is split in half if we get a split and retry oom,
// until we succeed or hit the min of 1 row.
RmmRapidsRetryIterator.withRetry(numRowsWrapper,
RmmRapidsRetryIterator.splitTargetSizeInHalfCpu(), (numRows) -> {
return allocBuffersWithRestore(numRows);
}).next();
// Update our estimate for number of rows with the final size used to allocate the buffers.
numRowsEstimate = (int) bufsAndNumRows._2.targetSize();
long dataLength = calcDataLengthEstimate(numRowsEstimate);
try (
SpillableHostBuffer sdb = SpillableHostBuffer$.MODULE$.apply(db, db.getLength(),
SpillPriorities$.MODULE$.ACTIVE_ON_DECK_PRIORITY(),
RapidsBufferCatalog$.MODULE$.singleton());
SpillableHostBuffer sdb = bufsAndNumRows._1[0];
SpillableHostBuffer sob = bufsAndNumRows._1[1];
) {
HostMemoryBuffer ob =
RmmRapidsRetryIterator.withRetryNoSplit( () -> {
return HostAlloc$.MODULE$.alloc(
((long) numRowsEstimate + 1) * BYTES_PER_OFFSET, true);
});
try (
SpillableHostBuffer sob = SpillableHostBuffer$.MODULE$.apply(ob, ob.getLength(),
SpillPriorities$.MODULE$.ACTIVE_ON_DECK_PRIORITY(),
RapidsBufferCatalog$.MODULE$.singleton());
) {
// Fill in buffer under write lock for host buffers
int[] used = sdb.withHostBufferWriteLock( (dataBuffer) -> {
return sob.withHostBufferWriteLock( (offsetsBuffer) -> {
return fillBatch(dataBuffer, offsetsBuffer);
});
});
batchAndRange = sdb.withHostBufferReadOnly( (dataBuffer) -> {
return sob.withHostBufferReadOnly( (offsetsBuffer) -> {
int dataOffset = used[0];
int currentRow = used[1];
// We don't want to loop forever trying to copy nothing
assert (currentRow > 0);
if (numInputRows != null) {
numInputRows.add(currentRow);
}
if (numOutputRows != null) {
numOutputRows.add(currentRow);
}
if (numOutputBatches != null) {
numOutputBatches.add(1);
}
// Now that we have filled the buffers with the data, we need to turn them into a
// HostColumnVector and copy them to the device so the GPU can turn it into a Table.
// To do this we first need to make a HostColumnCoreVector for the data, and then
// put that into a HostColumnVector as its child. This the basics of building up
// a column of lists of bytes in CUDF but it is typically hidden behind the higer level
// APIs.
dataBuffer.incRefCount();
offsetsBuffer.incRefCount();
try (HostColumnVectorCore dataCv =
new HostColumnVectorCore(DType.INT8, dataOffset, Optional.of(0L),
dataBuffer, null, null, new ArrayList<>());
HostColumnVector hostColumn = new HostColumnVector(DType.LIST,
currentRow, Optional.of(0L), null, null,
offsetsBuffer, Collections.singletonList(dataCv))) {

long ct = System.nanoTime() - collectStart;
streamTime.add(ct);

// Grab the semaphore because we are about to put data onto the GPU.
GpuSemaphore$.MODULE$.acquireIfNecessary(TaskContext.get());
NvtxRange range = NvtxWithMetrics.apply("RowToColumnar: build", NvtxColor.GREEN,
Option.apply(opTime));
ColumnVector devColumn =
RmmRapidsRetryIterator.withRetryNoSplit(hostColumn::copyToDevice);
return Tuple2.apply(makeSpillableBatch(devColumn), range);
}
});
// Fill in buffer under write lock for host buffers
batchAndRange = sdb.withHostBufferWriteLock( (dataBuffer) -> {
return sob.withHostBufferWriteLock( (offsetsBuffer) -> {
int[] used = fillBatch(dataBuffer, offsetsBuffer, dataLength, numRowsEstimate);
int dataOffset = used[0];
int currentRow = used[1];
// We don't want to loop forever trying to copy nothing
assert (currentRow > 0);
if (numInputRows != null) {
numInputRows.add(currentRow);
}
if (numOutputRows != null) {
numOutputRows.add(currentRow);
}
if (numOutputBatches != null) {
numOutputBatches.add(1);
}
// Now that we have filled the buffers with the data, we need to turn them into a
// HostColumnVector and copy them to the device so the GPU can turn it into a Table.
// To do this we first need to make a HostColumnCoreVector for the data, and then
// put that into a HostColumnVector as its child. This the basics of building up
// a column of lists of bytes in CUDF but it is typically hidden behind the higer level
// APIs.
dataBuffer.incRefCount();
offsetsBuffer.incRefCount();
try (HostColumnVectorCore dataCv =
new HostColumnVectorCore(DType.INT8, dataOffset, Optional.of(0L),
dataBuffer, null, null, new ArrayList<>());
HostColumnVector hostColumn = new HostColumnVector(DType.LIST,
currentRow, Optional.of(0L), null, null,
offsetsBuffer, Collections.singletonList(dataCv))) {

long ct = System.nanoTime() - collectStart;
streamTime.add(ct);

// Grab the semaphore because we are about to put data onto the GPU.
GpuSemaphore$.MODULE$.acquireIfNecessary(TaskContext.get());
NvtxRange range = NvtxWithMetrics.apply("RowToColumnar: build", NvtxColor.GREEN,
Option.apply(opTime));
ColumnVector devColumn =
RmmRapidsRetryIterator.withRetryNoSplit(hostColumn::copyToDevice);
return Tuple2.apply(makeSpillableBatch(devColumn), range);
}
});
}
});
}
try (NvtxRange ignored = batchAndRange._2;
Table tab =
Expand All @@ -202,6 +208,63 @@ public ColumnarBatch next() {
}
}

private Tuple2<SpillableHostBuffer[], AutoCloseableTargetSize>
allocBuffers(SpillableHostBuffer[] sBufs, AutoCloseableTargetSize numRowsWrapper) {
HostMemoryBuffer[] hBufs = new HostMemoryBuffer[]{ null, null };
try {
long dataBytes = calcDataLengthEstimate((int) numRowsWrapper.targetSize());
long offsetBytes = calcOffsetLengthEstimate((int) numRowsWrapper.targetSize());
hBufs[0] = HostAlloc$.MODULE$.alloc(dataBytes, true);
sBufs[0] = SpillableHostBuffer$.MODULE$.apply(hBufs[0], hBufs[0].getLength(),
SpillPriorities$.MODULE$.ACTIVE_ON_DECK_PRIORITY(),
RapidsBufferCatalog$.MODULE$.singleton());
hBufs[0] = null; // Was closed by spillable
hBufs[1] = HostAlloc$.MODULE$.alloc(offsetBytes, true);
sBufs[1] = SpillableHostBuffer$.MODULE$.apply(hBufs[1], hBufs[1].getLength(),
SpillPriorities$.MODULE$.ACTIVE_ON_DECK_PRIORITY(),
RapidsBufferCatalog$.MODULE$.singleton());
hBufs[1] = null; // Was closed by spillable
return Tuple2.apply(sBufs, numRowsWrapper);
} finally {
// Make sure host buffers are always closed
for (int i = 0; i < hBufs.length; i++) {
if (hBufs[i] != null) {
hBufs[i].close();
hBufs[i] = null;
}
}
// If the second spillable buffer is null, we must have thrown,
// so we need to close the first one in case this is not a retry exception.
// Restore on retry is handled by the caller.
if ((sBufs[1] == null) && (sBufs[0] != null)) {
sBufs[0].close();
sBufs[0] = null;
}
}
}

private Tuple2<SpillableHostBuffer[], AutoCloseableTargetSize>
allocBuffersWithRestore(AutoCloseableTargetSize numRows) {
SpillableHostBuffer[] spillableBufs = new SpillableHostBuffer[]{ null, null};
Retryable retryBufs = new Retryable() {
@Override
public void checkpoint() {}
@Override
public void restore() {
for (int i = 0; i < spillableBufs.length; i++) {
if (spillableBufs[i] != null) {
spillableBufs[i].close();
spillableBufs[i] = null;
}
}
}
};

return RmmRapidsRetryIterator.withRestoreOnRetry(retryBufs, () -> {
return allocBuffers(spillableBufs, numRows);
});
}

/**
* Take our device column of encoded rows and turn it into a spillable columnar batch.
* This allows us to go into a retry block and be able to roll back our work.
Expand Down Expand Up @@ -244,8 +307,11 @@ protected Table convertFromRowsUnderRetry(ColumnarBatch cb) {
* virtual function call per batch instead of one per row.
* @param dataBuffer the data buffer to populate
* @param offsetsBuffer the offsets buffer to populate
* @param dataLength the data length corresponding to the current rows estimate.
* @param numRows the number of rows we can fill
* @return an array of ints where the first index is the amount of data in bytes copied into
* the data buffer and the second index is the number of rows copied into the buffers.
*/
public abstract int[] fillBatch(HostMemoryBuffer dataBuffer, HostMemoryBuffer offsetsBuffer);
public abstract int[] fillBatch(HostMemoryBuffer dataBuffer, HostMemoryBuffer offsetsBuffer,
long dataLength, int numRows);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids
import ai.rapids.cudf.{GatherMap, NvtxColor, OutOfBoundsPolicy}
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{splitTargetSizeInHalf, withRestoreOnRetry, withRetry}
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{splitTargetSizeInHalfGpu, withRestoreOnRetry, withRetry}
import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion

import org.apache.spark.TaskContext
Expand Down Expand Up @@ -148,7 +148,7 @@ abstract class AbstractGpuJoinIterator(
// less from the gatherer, but because the gatherer tracks how much is used, the
// next call to this function will start in the right place.
gather.checkpoint()
withRetry(targetSizeWrapper, splitTargetSizeInHalf) { attempt =>
withRetry(targetSizeWrapper, splitTargetSizeInHalfGpu) { attempt =>
withRestoreOnRetry(gather) {
val nextRows = JoinGatherer.getRowsInNextBatch(gather, attempt.targetSize)
gather.gatherNext(nextRows)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,8 @@ object GeneratedInternalRowToCudfRowIterator extends Logging {
| // of a row at a time.
| @Override
| public int[] fillBatch(ai.rapids.cudf.HostMemoryBuffer dataBuffer,
| ai.rapids.cudf.HostMemoryBuffer offsetsBuffer) {
| ai.rapids.cudf.HostMemoryBuffer offsetsBuffer,
| long dataLength, int numRows) {
| final long dataBaseAddress = dataBuffer.getAddress();
| final long endDataAddress = dataBaseAddress + dataLength;
|
Expand Down Expand Up @@ -820,7 +821,7 @@ object GeneratedInternalRowToCudfRowIterator extends Logging {
| } else {
| currentRow += 1;
| dataOffset += numBytesUsedByRow;
| done = !(currentRow < numRowsEstimate &&
| done = !(currentRow < numRows &&
| dataOffset < dataLength &&
| input.hasNext());
| }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ private class HostAlloc(nonPinnedLimit: Long) extends HostMemoryAllocator with L
logDebug(s"Targeting host store size of $targetSize bytes")
// We could not make it work so try and spill enough to make it work
val maybeAmountSpilled =
RapidsBufferCatalog.synchronousSpill(RapidsBufferCatalog.getHostStorage, allocSize)
RapidsBufferCatalog.synchronousSpill(RapidsBufferCatalog.getHostStorage, targetSize)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch

maybeAmountSpilled.foreach { amountSpilled =>
logInfo(s"Spilled $amountSpilled bytes from the host store")
}
Expand Down
Loading