Skip to content

Commit

Permalink
Heuristic to speed up partial aggregates that get larger (NVIDIA#8618)
Browse files Browse the repository at this point in the history
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
  • Loading branch information
revans2 authored Jun 29, 2023
1 parent e04486f commit 33c63fd
Show file tree
Hide file tree
Showing 15 changed files with 990 additions and 403 deletions.
2 changes: 1 addition & 1 deletion build/buildall
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ function build_single_shim() {
-Dskip \
-Dmaven.scalastyle.skip="$SKIP_CHECKS" \
-pl aggregator -am > "$LOG_FILE" 2>&1 || {
[[ "$LOG_FILE" != "/dev/tty" ]] && tail -20 "$LOG_FILE" || true
[[ "$LOG_FILE" != "/dev/tty" ]] && echo "$LOG_FILE:" && tail -20 "$LOG_FILE" || true
exit 255
}
}
Expand Down
4 changes: 2 additions & 2 deletions docs/additional-functionality/advanced_configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ nav_order: 10
---
<!-- Generated by RapidsConf.help. DO NOT EDIT! -->
# RAPIDS Accelerator for Apache Spark Advanced Configuration
Most users will not need to modify the configuration options listed below.
Most users will not need to modify the configuration options listed below.
They are documented here for completeness and advanced usage.

The following configuration options are supported by the RAPIDS Accelerator for Apache Spark.

For commonly used configurations and examples of setting options, please refer to the
For commonly used configurations and examples of setting options, please refer to the
[RAPIDS Accelerator for Configuration](../configs.md) page.


Expand Down
12 changes: 12 additions & 0 deletions integration_tests/src/main/python/hash_aggregate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1883,3 +1883,15 @@ def test_hash_aggregate_complete_with_grouping_expressions():
lambda spark : spark.range(10).withColumn("id2", f.col("id")),
"hash_agg_complete_table",
"select id, avg(id) from hash_agg_complete_table group by id, id2 + 1")

@ignore_order(local=True)
@pytest.mark.parametrize('cast_key_to', ["byte", "short", "int",
"long", "string", "DECIMAL(38,5)"], ids=idfn)
def test_hash_agg_force_pre_sort(cast_key_to):
def do_it(spark):
gen = StructGen([("key", UniqueLongGen()), ("value", long_gen)], nullable=False)
df = gen_df(spark, gen)
return df.selectExpr("CAST((key div 10) as " + cast_key_to + ") as key", "value").groupBy("key").sum("value")
assert_gpu_and_cpu_are_equal_collect(do_it,
conf={'spark.rapids.sql.agg.forceSinglePassPartialSort': True,
'spark.rapids.sql.agg.singlePassPartialSortEnabled': True})
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -63,6 +63,32 @@ object GpuBatchUtils {
estimateGpuMemory(field.dataType, field.nullable, rowCount)
}

/**
* Get the minimum size a column could be that matches these conditions.
*/
def minGpuMemory(dataType:DataType, nullable: Boolean, rowCount: Long): Long = {
val validityBufferSize = if (nullable) {
calculateValidityBufferSize(rowCount)
} else {
0
}

val dataSize = dataType match {
case DataTypes.BinaryType | DataTypes.StringType | _: MapType | _: ArrayType=>
// For nested types (like list or string) the smallest possible size is when
// each row is empty (length 0). In that case there is no data, just offsets
// and all of the offsets are 0.
calculateOffsetBufferSize(rowCount)
case dt: StructType =>
dt.fields.map { f =>
minGpuMemory(f.dataType, f.nullable, rowCount)
}.sum
case dt =>
dt.defaultSize * rowCount
}
dataSize + validityBufferSize
}

def estimateGpuMemory(dataType: DataType, nullable: Boolean, rowCount: Long): Long = {
val validityBufferSize = if (nullable) {
calculateValidityBufferSize(rowCount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ object GpuDeviceManager extends Logging {
*/
def getDeviceId(): Option[Int] = deviceId

@volatile private var poolSizeLimit = 0L

// Never split below 100 MiB (but this is really just for testing)
def getSplitUntilSize: Long = Math.max(poolSizeLimit / 8, 100 * 1024 * 1024)

// Attempt to set and acquire the gpu, return true if acquired, false otherwise
def tryToSetGpuDeviceAndAcquire(addr: Int): Boolean = {
try {
Expand Down Expand Up @@ -149,6 +154,7 @@ object GpuDeviceManager extends Logging {

chunkedPackMemoryResource.foreach(_.close)
chunkedPackMemoryResource = None
poolSizeLimit = 0L

RapidsBufferCatalog.close()
GpuShuffleEnv.shutdown()
Expand Down Expand Up @@ -338,6 +344,7 @@ object GpuDeviceManager extends Logging {

Cuda.setDevice(gpuId)
try {
poolSizeLimit = poolAllocation
Rmm.initialize(init, logConf, poolAllocation)
} catch {
case firstEx: CudfException if ((init & RmmAllocationMode.CUDA_ASYNC) != 0) => {
Expand All @@ -351,6 +358,7 @@ object GpuDeviceManager extends Logging {
logError(
"Failed to initialize RMM with either ASYNC or ARENA allocators. Exiting...")
secondEx.addSuppressed(firstEx)
poolSizeLimit = 0L
throw secondEx
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,7 @@ case class GpuSortExec(
val singleBatch = sortType == FullSortSingleBatch
child.executeColumnar().mapPartitions { cbIter =>
if (outOfCore) {
val cpuOrd = new LazilyGeneratedOrdering(sorter.cpuOrdering)
val iter = GpuOutOfCoreSortIterator(cbIter, sorter, cpuOrd,
val iter = GpuOutOfCoreSortIterator(cbIter, sorter,
targetSize, opTime, sortTime, outputBatch, outputRows)
TaskContext.get().addTaskCompletionListener(_ -> iter.close())
iter
Expand Down Expand Up @@ -239,14 +238,14 @@ class Pending(cpuOrd: LazilyGeneratedOrdering) extends AutoCloseable {
case class GpuOutOfCoreSortIterator(
iter: Iterator[ColumnarBatch],
sorter: GpuSorter,
cpuOrd: LazilyGeneratedOrdering,
targetSize: Long,
opTime: GpuMetric,
sortTime: GpuMetric,
outputBatches: GpuMetric,
outputRows: GpuMetric) extends Iterator[ColumnarBatch]
with AutoCloseable {

private val cpuOrd = new LazilyGeneratedOrdering(sorter.cpuOrdering)
// A priority queue of data that is not merged yet.
private val pending = new Pending(cpuOrd)

Expand Down Expand Up @@ -328,16 +327,16 @@ case class GpuOutOfCoreSortIterator(
targetRowCount until rows by targetRowCount
}
// Get back the first row so we can sort the batches
val gatherIndexes = if (hasFullySortedData) {
val lowerGatherIndexes = if (hasFullySortedData) {
// The first batch is sorted so don't gather a row for it
splitIndexes
} else {
Seq(0) ++ splitIndexes
}

val boundaries =
withResource(new NvtxRange("boundaries", NvtxColor.ORANGE)) { _ =>
withResource(ColumnVector.fromInts(gatherIndexes: _*)) { gatherMap =>
val lowerBoundaries =
withResource(new NvtxRange("lower boundaries", NvtxColor.ORANGE)) { _ =>
withResource(ColumnVector.fromInts(lowerGatherIndexes: _*)) { gatherMap =>
withResource(sortedTbl.gather(gatherMap)) { boundariesTab =>
convertBoundaries(boundariesTab)
}
Expand All @@ -355,9 +354,9 @@ case class GpuOutOfCoreSortIterator(
}

closeOnExcept(sortedCb) { _ =>
assert(boundaries.length == stillPending.length)
assert(lowerBoundaries.length == stillPending.length)
closeOnExcept(pendingObs) { _ =>
stillPending.zip(boundaries).foreach {
stillPending.zip(lowerBoundaries).foreach {
case (ct: ContiguousTable, lower: UnsafeRow) =>
if (ct.getRowCount > 0) {
val sp = SpillableColumnarBatch(ct, sorter.projectedBatchTypes,
Expand Down
25 changes: 23 additions & 2 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1273,6 +1273,23 @@ object RapidsConf {
.booleanConf
.createWithDefault(true)

val ENABLE_SINGLE_PASS_PARTIAL_SORT_AGG: ConfEntryWithDefault[Boolean] =
conf("spark.rapids.sql.agg.singlePassPartialSortEnabled")
.doc("Enable or disable a single pass partial sort optimization where if a heuristic " +
"indicates it would be good we pre-sort the data before a partial agg and then " +
"do the agg in a single pass with no merge, so there is no spilling")
.internal()
.booleanConf
.createWithDefault(true)

val FORCE_SINGLE_PASS_PARTIAL_SORT_AGG: ConfEntryWithDefault[Boolean] =
conf("spark.rapids.sql.agg.forceSinglePassPartialSort")
.doc("Force a single pass partial sort agg to happen in all cases that it could, " +
"no matter what the heuristic says. This is really just for testing.")
.internal()
.booleanConf
.createWithDefault(false)

val ENABLE_REGEXP = conf("spark.rapids.sql.regexp.enabled")
.doc("Specifies whether supported regular expressions will be evaluated on the GPU. " +
"Unsupported expressions will fall back to CPU. However, there are some known edge cases " +
Expand Down Expand Up @@ -2007,12 +2024,12 @@ object RapidsConf {
println(s"<!-- Generated by RapidsConf.help. DO NOT EDIT! -->")
// scalastyle:off line.size.limit
println("""# RAPIDS Accelerator for Apache Spark Advanced Configuration
|Most users will not need to modify the configuration options listed below.
|Most users will not need to modify the configuration options listed below.
|They are documented here for completeness and advanced usage.
|
|The following configuration options are supported by the RAPIDS Accelerator for Apache Spark.
|
|For commonly used configurations and examples of setting options, please refer to the
|For commonly used configurations and examples of setting options, please refer to the
|[RAPIDS Accelerator for Configuration](../configs.md) page.
|""".stripMargin)
// scalastyle:on line.size.limit
Expand Down Expand Up @@ -2554,6 +2571,10 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isRangeWindowDecimalEnabled: Boolean = get(ENABLE_RANGE_WINDOW_DECIMAL)

lazy val allowSinglePassPartialSortAgg: Boolean = get(ENABLE_SINGLE_PASS_PARTIAL_SORT_AGG)

lazy val forceSinglePassPartialSortAgg: Boolean = get(FORCE_SINGLE_PASS_PARTIAL_SORT_AGG)

lazy val isRegExpEnabled: Boolean = get(ENABLE_REGEXP)

lazy val maxRegExpStateMemory: Long = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1418,7 +1418,8 @@ final class RuleNotFoundExprMeta[INPUT <: Expression](
willNotWorkOnGpu(s"GPU does not currently support the operator ${expr.getClass}")

override def convertToGpu(): GpuExpression =
throw new IllegalStateException("Cannot be converted to GPU")
throw new IllegalStateException(s"Cannot be converted to GPU ${expr.getClass} " +
s"${expr.dataType} $expr")
}

/** Base class for metadata around `RunnableCommand`. */
Expand Down
Loading

0 comments on commit 33c63fd

Please sign in to comment.