Skip to content

Commit

Permalink
Change unit tests that force ooms to specify the oom type (gpu|cpu) (#…
Browse files Browse the repository at this point in the history
…10130)

Signed-off-by: Jim Brennan <jimb@nvidia.com>
  • Loading branch information
jbrennan333 authored Jan 3, 2024
1 parent 2a82b1e commit ed1fa9f
Show file tree
Hide file tree
Showing 15 changed files with 85 additions and 50 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -60,7 +60,8 @@ class BatchWithPartitionDataSuite extends RmmSparkRetrySuiteBase with SparkQuery
closeOnExcept(buildBatch(getSampleValueData)) { valueBatch =>
val resultBatchIter = BatchWithPartitionDataUtils.addPartitionValuesToBatch(valueBatch,
Array(1), partValues.take(1), partSchema, maxGpuColumnSizeBytes)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
withResource(resultBatchIter) { _ =>
assertThrows[GpuSplitAndRetryOOM] {
resultBatchIter.next()
Expand All @@ -85,7 +86,8 @@ class BatchWithPartitionDataSuite extends RmmSparkRetrySuiteBase with SparkQuery
val resultBatchIter = BatchWithPartitionDataUtils.addPartitionValuesToBatch(valueBatch,
partRows, partValues, partSchema, maxGpuColumnSizeBytes)
withResource(resultBatchIter) { _ =>
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
// Assert that the final count of rows matches expected batch
// We also need to close each batch coming from `resultBatchIter`.
val rowCounts = resultBatchIter.map(withResource(_){_.numRows()}).sum
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -40,7 +40,8 @@ class ColumnToRowIteratorRetrySuite extends RmmSparkRetrySuiteBase {
attrs,
Iterator(buildBatch),
NoopMetric, NoopMetric, NoopMetric, NoopMetric)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
var numRows = 0
aCol2RowIter.foreach { _ =>
numRows += 1
Expand All @@ -53,7 +54,8 @@ class ColumnToRowIteratorRetrySuite extends RmmSparkRetrySuiteBase {
attrs,
Iterator(buildBatch),
NoopMetric, NoopMetric, NoopMetric, NoopMetric)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
var numRows = 0
aCol2RowIter.foreach { _ =>
numRows += 1
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,7 +29,8 @@ class CsvScanRetrySuite extends RmmSparkRetrySuiteBase {
val cudfSchema = GpuColumnVector.from(StructType(Seq(StructField("a", IntegerType),
StructField("b", IntegerType))))
val opts = CSVOptions.builder().hasHeader(false)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
val table = CSVPartitionReader.readToTable(bufferer, cudfSchema, NoopMetric,
opts, "CSV", null)
table.close()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -221,10 +221,12 @@ class GpuCoalesceBatchesRetrySuite

def injectError(injectRetry: Int, injectSplitAndRetry: Int): Unit = {
if (injectRetry > 0) {
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, injectRetry)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, injectRetry,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
}
if (injectSplitAndRetry > 0) {
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, injectSplitAndRetry)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, injectSplitAndRetry,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -211,7 +211,8 @@ class GpuSortRetrySuite extends RmmSparkRetrySuiteBase with MockitoSugar {
Iterator(buildBatch, buildBatch),
gpuSorter,
singleBatch = false)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 2)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 2,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
while (eachBatchIter.hasNext) {
var pos = 0
var curValue = 0
Expand All @@ -234,7 +235,8 @@ class GpuSortRetrySuite extends RmmSparkRetrySuiteBase with MockitoSugar {
inputIter,
gpuSorter,
singleBatch = false)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
assertThrows[GpuSplitAndRetryOOM] {
eachBatchIter.next()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -116,7 +116,8 @@ class HashAggregateRetrySuite

test("computeAndAggregate reduction with retry") {
val reductionBatch = buildReductionBatch()
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
val result = doReduction(reductionBatch)
withResource(result) { spillable =>
withResource(spillable.getColumnarBatch) { cb =>
Expand All @@ -133,7 +134,8 @@ class HashAggregateRetrySuite

test("computeAndAggregate reduction with two retries") {
val reductionBatch = buildReductionBatch()
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 2)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 2,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
val result = doReduction(reductionBatch)
withResource(result) { spillable =>
withResource(spillable.getColumnarBatch) { cb =>
Expand Down Expand Up @@ -162,7 +164,8 @@ class HashAggregateRetrySuite

test("computeAndAggregate group by with retry") {
val groupByBatch = buildGroupByBatch()
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
val result = doGroupBy(groupByBatch)
withResource(result) { spillable =>
withResource(spillable.getColumnarBatch) { cb =>
Expand Down Expand Up @@ -195,7 +198,8 @@ class HashAggregateRetrySuite

test("computeAndAggregate reduction with split and retry") {
val reductionBatch = buildReductionBatch()
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
val result = doReduction(reductionBatch)
withResource(result) { spillable =>
withResource(spillable.getColumnarBatch) { cb =>
Expand All @@ -213,7 +217,8 @@ class HashAggregateRetrySuite

test("computeAndAggregate group by with split retry") {
val groupByBatch = buildGroupByBatch()
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
val result = doGroupBy(groupByBatch)
withResource(result) { spillable =>
withResource(spillable.getColumnarBatch) { cb =>
Expand Down Expand Up @@ -248,7 +253,8 @@ class HashAggregateRetrySuite
// with forceMerge we expect 1 batch to be returned at all costs
val groupByBatch = buildGroupByBatch()
// we force a split because that would cause us to compute two aggs
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
val result = doGroupBy(groupByBatch, forceMerge = true)
withResource(result) { spillable =>
withResource(spillable.getColumnarBatch) { cb =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -46,7 +46,8 @@ class HostColumnToGpuRetrySuite extends RmmSparkRetrySuiteBase {
withResource(buildArrowIntColumn(allocator)) { arrowColumn =>
builder.copyColumnar(arrowColumn, 0, NUM_ROWS)
}
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
RmmRapidsRetryIterator.withRetryNoSplit[ColumnarBatch] {
builder.tryBuild(NUM_ROWS)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,7 +30,8 @@ class JsonScanRetrySuite extends RmmSparkRetrySuiteBase {
val cudfSchema = GpuColumnVector.from(StructType(Seq(StructField("a", IntegerType),
StructField("b", IntegerType))))
val opts = JSONOptions.builder().withLines(true).build()
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
val table = JsonPartitionReader.readToTable(bufferer, cudfSchema, NoopMetric,
opts, "JSON", null)
table.close()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -53,7 +53,8 @@ class LimitRetrySuite extends RmmSparkRetrySuiteBase {
val numRows = limit - offset
var curValue = offset
var pos = 0
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
assert(topNIter.hasNext)
withResource(topNIter.next()) { scb =>
withResource(scb.getColumnarBatch()) { cb =>
Expand Down Expand Up @@ -81,7 +82,8 @@ class LimitRetrySuite extends RmmSparkRetrySuiteBase {
limit, offset, NoopMetric, NoopMetric, NoopMetric)
var leftRows = if (limit > totalRows) totalRows - offset else limit - offset
var curValue = offset
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
while(limitIter.hasNext) {
var pos = 0
withResource(limitIter.next()) { cb =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -83,7 +83,8 @@ class NonDeterministicRetrySuite extends RmmSparkRetrySuiteBase {
}
closeOnExcept(sb) { _ =>
if (forceRetry) {
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
}
}
boundProjectList.projectAndCloseWithRetrySingleBatch(sb)
Expand Down Expand Up @@ -117,7 +118,8 @@ class NonDeterministicRetrySuite extends RmmSparkRetrySuiteBase {

val cb = buildBatch()
if (forceRetry) {
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
}
val batchSeq = GpuFilter.filterAndClose(cb, boundCondition,
NoopMetric, NoopMetric, NoopMetric).toSeq
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -73,7 +73,8 @@ class ProjectExprSuite extends SparkQueryCompareTestSuite {
"ret")()
val sb = buildProjectBatch()

RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
val result = GpuProjectExec.projectAndCloseWithRetrySingleBatch(sb, Seq(expr))
withResource(result) { cb =>
assertResult(4)(cb.numRows)
Expand Down Expand Up @@ -104,7 +105,8 @@ class ProjectExprSuite extends SparkQueryCompareTestSuite {
val tp = GpuBindReferences.bindGpuReferencesTiered(Seq(fullAdd), Seq(a, b), true)
val sb = buildProjectBatch()

RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
val result = tp.projectAndCloseWithRetrySingleBatch(sb)
withResource(result) { cb =>
assertResult(4)(cb.numRows)
Expand Down Expand Up @@ -138,7 +140,8 @@ class ProjectExprSuite extends SparkQueryCompareTestSuite {
val mockPlan = mock(classOf[SparkPlan])
when(mockPlan.output).thenReturn(Seq(a, b))
val ast = GpuProjectAstExec(List(expr.asInstanceOf[Expression]), mockPlan)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
withResource(sb) { sb =>
withResource(ast.buildRetryableAstIterator(Seq(sb.getColumnarBatch).iterator)) { result =>
withResource(result.next()) { cb =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -51,7 +51,8 @@ class RangeRetrySuite extends RmmSparkRetrySuiteBase {

test("GPU range iterator with split and retry OOM") {
val rangeIter = new GpuRangeIterator(start, end, step, maxRows, null, NoopMetric)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
// It should produce two batches, and rows numbers are
// 10 (=20/2) after retry, and
// 15 (25-10), the remaining ones.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,7 +28,8 @@ class RowToColumnarIteratorRetrySuite extends RmmSparkRetrySuiteBase {
val rowIter: Iterator[InternalRow] = (1 to 10).map(InternalRow(_)).toIterator
val row2ColIter = new RowToColumnarIterator(
rowIter, schema, RequireSingleBatch, new GpuRowToColumnConverter(schema))
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
Arm.withResource(row2ColIter.next()) { batch =>
assertResult(10)(batch.numRows())
}
Expand All @@ -38,7 +39,8 @@ class RowToColumnarIteratorRetrySuite extends RmmSparkRetrySuiteBase {
val rowIter: Iterator[InternalRow] = (1 to 10).map(InternalRow(_)).toIterator
val row2ColIter = new RowToColumnarIterator(
rowIter, schema, RequireSingleBatch, new GpuRowToColumnConverter(schema))
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
assertThrows[GpuSplitAndRetryOOM] {
row2ColIter.next()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -40,7 +40,8 @@ class ShufflePartitionerRetrySuite extends RmmSparkRetrySuiteBase {
val rrp = GpuRoundRobinPartitioning(partNum)
// batch will be closed within columnarEvalAny
val batch = buildBatch
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
var ret: Array[(ColumnarBatch, Int)] = null
try {
ret = rrp.columnarEvalAny(batch).asInstanceOf[Array[(ColumnarBatch, Int)]]
Expand Down Expand Up @@ -68,7 +69,8 @@ class ShufflePartitionerRetrySuite extends RmmSparkRetrySuiteBase {
val rp = GpuRangePartitioner(Array.apply(bounds), gpuSorter)
// batch will be closed within columnarEvalAny
val batch = buildBatch
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
var ret: Array[(ColumnarBatch, Int)] = null
try {
ret = rp.columnarEvalAny(batch).asInstanceOf[Array[(ColumnarBatch, Int)]]
Expand Down
Loading

0 comments on commit ed1fa9f

Please sign in to comment.