Skip to content

Commit

Permalink
Additional unit tests for GeneratedInternalRowToCudfRowIterator (#10087)
Browse files Browse the repository at this point in the history
Signed-off-by: Jim Brennan <jimb@nvidia.com>
  • Loading branch information
jbrennan333 authored Jan 2, 2024
1 parent d39c63a commit cd2b78b
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids

import ai.rapids.cudf.Table
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.jni.{CpuSplitAndRetryOOM, RmmSpark}
import com.nvidia.spark.rapids.jni.{GpuSplitAndRetryOOM, RmmSpark}
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{doAnswer, spy, times, verify}
import org.mockito.invocation.InvocationOnMock
Expand Down Expand Up @@ -56,7 +56,8 @@ class GeneratedInternalRowToCudfRowIteratorRetrySuite
ctriter, schema, TargetSize(Int.MaxValue),
NoopMetric, NoopMetric, NoopMetric, NoopMetric, NoopMetric)
// this forces a retry on the copy of the host column to a device column
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
withResource(myIter.next()) { devBatch =>
withResource(buildBatch()) { expected =>
TestUtils.compareBatches(expected, devBatch)
Expand All @@ -77,7 +78,8 @@ class GeneratedInternalRowToCudfRowIteratorRetrySuite
val res = invocation.callRealMethod()
// we mock things this way due to code generation issues with mockito.
// when we add a table we have
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 3)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 3,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
rapidsBufferSpy = spy(res.asInstanceOf[RapidsBuffer])
rapidsBufferSpy
}
Expand All @@ -90,7 +92,8 @@ class GeneratedInternalRowToCudfRowIteratorRetrySuite
val myIter = spy(GeneratedInternalRowToCudfRowIterator(
ctriter, schema, TargetSize(Int.MaxValue),
NoopMetric, NoopMetric, NoopMetric, NoopMetric, NoopMetric))
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 2)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 2,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
assertResult(0)(getAndResetNumRetryThrowCurrentTask)
withResource(myIter.next()) { devBatch =>
withResource(buildBatch()) { expected =>
Expand Down Expand Up @@ -118,7 +121,8 @@ class GeneratedInternalRowToCudfRowIteratorRetrySuite
val res = invocation.callRealMethod()
// we mock things this way due to code generation issues with mockito.
// when we add a table we have
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 3)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 3,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
rapidsBufferSpy = spy(res.asInstanceOf[RapidsBuffer])
// at this point we have created a buffer in the Spill Framework
// lets spill it
Expand All @@ -134,7 +138,8 @@ class GeneratedInternalRowToCudfRowIteratorRetrySuite
val myIter = spy(GeneratedInternalRowToCudfRowIterator(
ctriter, schema, TargetSize(Int.MaxValue),
NoopMetric, NoopMetric, NoopMetric, NoopMetric, NoopMetric))
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 2)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 2,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
assertResult(0)(getAndResetNumRetryThrowCurrentTask)
withResource(myIter.next()) { devBatch =>
withResource(buildBatch()) { expected =>
Expand Down Expand Up @@ -163,11 +168,115 @@ class GeneratedInternalRowToCudfRowIteratorRetrySuite
val myIter = GeneratedInternalRowToCudfRowIterator(
ctriter, schema, TargetSize(1),
NoopMetric, NoopMetric, NoopMetric, NoopMetric, NoopMetric)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId)
assertThrows[CpuSplitAndRetryOOM] {
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
assertThrows[GpuSplitAndRetryOOM] {
myIter.next()
}
assertResult(0)(RapidsBufferCatalog.getDeviceStorage.currentSize)
}
}

test("a retry when allocating dataBuffer is handled") {
val batch = buildBatch()
val batchIter = Seq(batch).iterator
withResource(new ColumnarToRowIterator(batchIter, NoopMetric, NoopMetric, NoopMetric,
NoopMetric)) { ctriter =>
val schema = Array(AttributeReference("longcol", LongType)().toAttribute)
val myIter = GeneratedInternalRowToCudfRowIterator(
ctriter, schema, TargetSize(Int.MaxValue),
NoopMetric, NoopMetric, NoopMetric, NoopMetric, NoopMetric)
// Do this so we can avoid forcing failures in any host allocations
// in ColumnarToRowIterator.hasNext()
assert(ctriter.hasNext)
// this forces a retry on the allocation of the dataBuffer
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.CPU.ordinal, 0)
withResource(myIter.next()) { devBatch =>
withResource(buildBatch()) { expected =>
TestUtils.compareBatches(expected, devBatch)
}
}
assert(!GpuColumnVector.extractBases(batch).exists(_.getRefCount > 0))
assert(!myIter.hasNext)
assertResult(0)(RapidsBufferCatalog.getDeviceStorage.currentSize)
}
}

test("a retry when allocating offsetsBuffer is handled") {
val batch = buildBatch()
val batchIter = Seq(batch).iterator
withResource(new ColumnarToRowIterator(batchIter, NoopMetric, NoopMetric, NoopMetric,
NoopMetric)) { ctriter =>
val schema = Array(AttributeReference("longcol", LongType)().toAttribute)
val myIter = GeneratedInternalRowToCudfRowIterator(
ctriter, schema, TargetSize(Int.MaxValue),
NoopMetric, NoopMetric, NoopMetric, NoopMetric, NoopMetric)
// Do this so we can avoid forcing failures in any host allocations
// in ColumnarToRowIterator.hasNext()
assert(ctriter.hasNext)
// this forces a retry on the allocation of the offsetBuffer
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.CPU.ordinal, 1)
withResource(myIter.next()) { devBatch =>
withResource(buildBatch()) { expected =>
TestUtils.compareBatches(expected, devBatch)
}
}
assert(!GpuColumnVector.extractBases(batch).exists(_.getRefCount > 0))
assert(!myIter.hasNext)
assertResult(0)(RapidsBufferCatalog.getDeviceStorage.currentSize)
}
}

test("a split and retry when allocating dataBuffer is handled") {
val batch = buildBatch()
val batchIter = Seq(batch).iterator
withResource(new ColumnarToRowIterator(batchIter, NoopMetric, NoopMetric, NoopMetric,
NoopMetric)) { ctriter =>
val schema = Array(AttributeReference("longcol", LongType)().toAttribute)
val myIter = GeneratedInternalRowToCudfRowIterator(
ctriter, schema, TargetSize(Int.MaxValue),
NoopMetric, NoopMetric, NoopMetric, NoopMetric, NoopMetric)
// Do this so we can avoid forcing failures in any host allocations
// in ColumnarToRowIterator.hasNext()
assert(ctriter.hasNext)
// this forces a split retry on the allocation of the dataBuffer
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.CPU.ordinal, 0)
withResource(myIter.next()) { devBatch =>
withResource(buildBatch()) { expected =>
TestUtils.compareBatches(expected, devBatch)
}
}
assert(!GpuColumnVector.extractBases(batch).exists(_.getRefCount > 0))
assert(!myIter.hasNext)
assertResult(0)(RapidsBufferCatalog.getDeviceStorage.currentSize)
}
}
test("a split and retry when allocating offsetsBuffer is handled") {
val batch = buildBatch()
val batchIter = Seq(batch).iterator
withResource(new ColumnarToRowIterator(batchIter, NoopMetric, NoopMetric, NoopMetric,
NoopMetric)) { ctriter =>
val schema = Array(AttributeReference("longcol", LongType)().toAttribute)
val myIter = GeneratedInternalRowToCudfRowIterator(
ctriter, schema, TargetSize(Int.MaxValue),
NoopMetric, NoopMetric, NoopMetric, NoopMetric, NoopMetric)
// Do this so we can avoid forcing failures in any host allocations
// in ColumnarToRowIterator.hasNext()
assert(ctriter.hasNext)
// this forces a split retry on the allocation of the offsetsBuffer
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.CPU.ordinal, 1)
withResource(myIter.next()) { devBatch =>
withResource(buildBatch()) { expected =>
TestUtils.compareBatches(expected, devBatch)
}
}
assert(!GpuColumnVector.extractBases(batch).exists(_.getRefCount > 0))
assert(!myIter.hasNext)
assertResult(0)(RapidsBufferCatalog.getDeviceStorage.currentSize)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -48,6 +48,7 @@ trait RmmSparkRetrySuiteBase extends AnyFunSuite with BeforeAndAfterEach {
val mockEventHandler = new BaseRmmEventHandler()
RmmSpark.setEventHandler(mockEventHandler)
RmmSpark.currentThreadIsDedicatedToTask(1)
HostAlloc.initialize(-1)
}

override def afterEach(): Unit = {
Expand All @@ -61,6 +62,7 @@ trait RmmSparkRetrySuiteBase extends AnyFunSuite with BeforeAndAfterEach {
if (rmmWasInitialized) {
Rmm.shutdown()
}
HostAlloc.initialize(-1)
}

private class BaseRmmEventHandler extends RmmEventHandler {
Expand Down

0 comments on commit cd2b78b

Please sign in to comment.