Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change unit tests that force ooms to specify the oom type (gpu|cpu) #10130

Merged
merged 2 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -60,7 +60,8 @@ class BatchWithPartitionDataSuite extends RmmSparkRetrySuiteBase with SparkQuery
closeOnExcept(buildBatch(getSampleValueData)) { valueBatch =>
val resultBatchIter = BatchWithPartitionDataUtils.addPartitionValuesToBatch(valueBatch,
Array(1), partValues.take(1), partSchema, maxGpuColumnSizeBytes)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
withResource(resultBatchIter) { _ =>
assertThrows[GpuSplitAndRetryOOM] {
resultBatchIter.next()
Expand All @@ -85,7 +86,8 @@ class BatchWithPartitionDataSuite extends RmmSparkRetrySuiteBase with SparkQuery
val resultBatchIter = BatchWithPartitionDataUtils.addPartitionValuesToBatch(valueBatch,
partRows, partValues, partSchema, maxGpuColumnSizeBytes)
withResource(resultBatchIter) { _ =>
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
// Assert that the final count of rows matches expected batch
// We also need to close each batch coming from `resultBatchIter`.
val rowCounts = resultBatchIter.map(withResource(_){_.numRows()}).sum
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -40,7 +40,8 @@ class ColumnToRowIteratorRetrySuite extends RmmSparkRetrySuiteBase {
attrs,
Iterator(buildBatch),
NoopMetric, NoopMetric, NoopMetric, NoopMetric)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
var numRows = 0
aCol2RowIter.foreach { _ =>
numRows += 1
Expand All @@ -53,7 +54,8 @@ class ColumnToRowIteratorRetrySuite extends RmmSparkRetrySuiteBase {
attrs,
Iterator(buildBatch),
NoopMetric, NoopMetric, NoopMetric, NoopMetric)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
var numRows = 0
aCol2RowIter.foreach { _ =>
numRows += 1
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,7 +29,8 @@ class CsvScanRetrySuite extends RmmSparkRetrySuiteBase {
val cudfSchema = GpuColumnVector.from(StructType(Seq(StructField("a", IntegerType),
StructField("b", IntegerType))))
val opts = CSVOptions.builder().hasHeader(false)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
val table = CSVPartitionReader.readToTable(bufferer, cudfSchema, NoopMetric,
opts, "CSV", null)
table.close()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -221,10 +221,12 @@ class GpuCoalesceBatchesRetrySuite

def injectError(injectRetry: Int, injectSplitAndRetry: Int): Unit = {
if (injectRetry > 0) {
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, injectRetry)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, injectRetry,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
}
if (injectSplitAndRetry > 0) {
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, injectSplitAndRetry)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, injectSplitAndRetry,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -211,7 +211,8 @@ class GpuSortRetrySuite extends RmmSparkRetrySuiteBase with MockitoSugar {
Iterator(buildBatch, buildBatch),
gpuSorter,
singleBatch = false)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 2)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 2,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
while (eachBatchIter.hasNext) {
var pos = 0
var curValue = 0
Expand All @@ -234,7 +235,8 @@ class GpuSortRetrySuite extends RmmSparkRetrySuiteBase with MockitoSugar {
inputIter,
gpuSorter,
singleBatch = false)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
assertThrows[GpuSplitAndRetryOOM] {
eachBatchIter.next()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -116,7 +116,8 @@ class HashAggregateRetrySuite

test("computeAndAggregate reduction with retry") {
val reductionBatch = buildReductionBatch()
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
val result = doReduction(reductionBatch)
withResource(result) { spillable =>
withResource(spillable.getColumnarBatch) { cb =>
Expand All @@ -133,7 +134,8 @@ class HashAggregateRetrySuite

test("computeAndAggregate reduction with two retries") {
val reductionBatch = buildReductionBatch()
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 2)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 2,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
val result = doReduction(reductionBatch)
withResource(result) { spillable =>
withResource(spillable.getColumnarBatch) { cb =>
Expand Down Expand Up @@ -162,7 +164,8 @@ class HashAggregateRetrySuite

test("computeAndAggregate group by with retry") {
val groupByBatch = buildGroupByBatch()
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
val result = doGroupBy(groupByBatch)
withResource(result) { spillable =>
withResource(spillable.getColumnarBatch) { cb =>
Expand Down Expand Up @@ -195,7 +198,8 @@ class HashAggregateRetrySuite

test("computeAndAggregate reduction with split and retry") {
val reductionBatch = buildReductionBatch()
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
val result = doReduction(reductionBatch)
withResource(result) { spillable =>
withResource(spillable.getColumnarBatch) { cb =>
Expand All @@ -213,7 +217,8 @@ class HashAggregateRetrySuite

test("computeAndAggregate group by with split retry") {
val groupByBatch = buildGroupByBatch()
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
val result = doGroupBy(groupByBatch)
withResource(result) { spillable =>
withResource(spillable.getColumnarBatch) { cb =>
Expand Down Expand Up @@ -248,7 +253,8 @@ class HashAggregateRetrySuite
// with forceMerge we expect 1 batch to be returned at all costs
val groupByBatch = buildGroupByBatch()
// we force a split because that would cause us to compute two aggs
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
val result = doGroupBy(groupByBatch, forceMerge = true)
withResource(result) { spillable =>
withResource(spillable.getColumnarBatch) { cb =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -46,7 +46,8 @@ class HostColumnToGpuRetrySuite extends RmmSparkRetrySuiteBase {
withResource(buildArrowIntColumn(allocator)) { arrowColumn =>
builder.copyColumnar(arrowColumn, 0, NUM_ROWS)
}
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
RmmRapidsRetryIterator.withRetryNoSplit[ColumnarBatch] {
builder.tryBuild(NUM_ROWS)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,7 +30,8 @@ class JsonScanRetrySuite extends RmmSparkRetrySuiteBase {
val cudfSchema = GpuColumnVector.from(StructType(Seq(StructField("a", IntegerType),
StructField("b", IntegerType))))
val opts = JSONOptions.builder().withLines(true).build()
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
val table = JsonPartitionReader.readToTable(bufferer, cudfSchema, NoopMetric,
opts, "JSON", null)
table.close()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -53,7 +53,8 @@ class LimitRetrySuite extends RmmSparkRetrySuiteBase {
val numRows = limit - offset
var curValue = offset
var pos = 0
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
assert(topNIter.hasNext)
withResource(topNIter.next()) { scb =>
withResource(scb.getColumnarBatch()) { cb =>
Expand Down Expand Up @@ -81,7 +82,8 @@ class LimitRetrySuite extends RmmSparkRetrySuiteBase {
limit, offset, NoopMetric, NoopMetric, NoopMetric)
var leftRows = if (limit > totalRows) totalRows - offset else limit - offset
var curValue = offset
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
while(limitIter.hasNext) {
var pos = 0
withResource(limitIter.next()) { cb =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -83,7 +83,8 @@ class NonDeterministicRetrySuite extends RmmSparkRetrySuiteBase {
}
closeOnExcept(sb) { _ =>
if (forceRetry) {
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
}
}
boundProjectList.projectAndCloseWithRetrySingleBatch(sb)
Expand Down Expand Up @@ -117,7 +118,8 @@ class NonDeterministicRetrySuite extends RmmSparkRetrySuiteBase {

val cb = buildBatch()
if (forceRetry) {
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
}
val batchSeq = GpuFilter.filterAndClose(cb, boundCondition,
NoopMetric, NoopMetric, NoopMetric).toSeq
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -73,7 +73,8 @@ class ProjectExprSuite extends SparkQueryCompareTestSuite {
"ret")()
val sb = buildProjectBatch()

RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
val result = GpuProjectExec.projectAndCloseWithRetrySingleBatch(sb, Seq(expr))
withResource(result) { cb =>
assertResult(4)(cb.numRows)
Expand Down Expand Up @@ -104,7 +105,8 @@ class ProjectExprSuite extends SparkQueryCompareTestSuite {
val tp = GpuBindReferences.bindGpuReferencesTiered(Seq(fullAdd), Seq(a, b), true)
val sb = buildProjectBatch()

RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
val result = tp.projectAndCloseWithRetrySingleBatch(sb)
withResource(result) { cb =>
assertResult(4)(cb.numRows)
Expand Down Expand Up @@ -138,7 +140,8 @@ class ProjectExprSuite extends SparkQueryCompareTestSuite {
val mockPlan = mock(classOf[SparkPlan])
when(mockPlan.output).thenReturn(Seq(a, b))
val ast = GpuProjectAstExec(List(expr.asInstanceOf[Expression]), mockPlan)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
withResource(sb) { sb =>
withResource(ast.buildRetryableAstIterator(Seq(sb.getColumnarBatch).iterator)) { result =>
withResource(result.next()) { cb =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -51,7 +51,8 @@ class RangeRetrySuite extends RmmSparkRetrySuiteBase {

test("GPU range iterator with split and retry OOM") {
val rangeIter = new GpuRangeIterator(start, end, step, maxRows, null, NoopMetric)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
// It should produce two batches, and rows numbers are
// 10 (=20/2) after retry, and
// 15 (25-10), the remaining ones.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,7 +28,8 @@ class RowToColumnarIteratorRetrySuite extends RmmSparkRetrySuiteBase {
val rowIter: Iterator[InternalRow] = (1 to 10).map(InternalRow(_)).toIterator
val row2ColIter = new RowToColumnarIterator(
rowIter, schema, RequireSingleBatch, new GpuRowToColumnConverter(schema))
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
Arm.withResource(row2ColIter.next()) { batch =>
assertResult(10)(batch.numRows())
}
Expand All @@ -38,7 +39,8 @@ class RowToColumnarIteratorRetrySuite extends RmmSparkRetrySuiteBase {
val rowIter: Iterator[InternalRow] = (1 to 10).map(InternalRow(_)).toIterator
val row2ColIter = new RowToColumnarIterator(
rowIter, schema, RequireSingleBatch, new GpuRowToColumnConverter(schema))
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId)
RmmSpark.forceSplitAndRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
assertThrows[GpuSplitAndRetryOOM] {
row2ColIter.next()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -40,7 +40,8 @@ class ShufflePartitionerRetrySuite extends RmmSparkRetrySuiteBase {
val rrp = GpuRoundRobinPartitioning(partNum)
// batch will be closed within columnarEvalAny
val batch = buildBatch
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
var ret: Array[(ColumnarBatch, Int)] = null
try {
ret = rrp.columnarEvalAny(batch).asInstanceOf[Array[(ColumnarBatch, Int)]]
Expand Down Expand Up @@ -68,7 +69,8 @@ class ShufflePartitionerRetrySuite extends RmmSparkRetrySuiteBase {
val rp = GpuRangePartitioner(Array.apply(bounds), gpuSorter)
// batch will be closed within columnarEvalAny
val batch = buildBatch
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1)
RmmSpark.forceRetryOOM(RmmSpark.getCurrentThreadId, 1,
RmmSpark.OomInjectionType.GPU.ordinal, 0)
var ret: Array[(ColumnarBatch, Int)] = null
try {
ret = rp.columnarEvalAny(batch).asInstanceOf[Array[(ColumnarBatch, Int)]]
Expand Down
Loading