Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batching support for row-based bounded window functions #9973

Merged
merged 25 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
ae3eec0
Initial swipe: Batched, bounded windows.
mythrocks Nov 14, 2023
2aa3f5d
Bounded windows routed to BatchedBoundedWindowExec.
mythrocks Nov 14, 2023
cb45866
First swipe at new version.
mythrocks Nov 20, 2023
c3cb1a4
Mostly working, but buggy. Losing some rows in the output.
mythrocks Nov 22, 2023
1230fed
Fixed up the math. Looks to be working at 150M.
mythrocks Dec 1, 2023
159dc62
Minor refactor/cleanup.
mythrocks Dec 4, 2023
392e7c6
Clearing cache on task completion.
mythrocks Dec 4, 2023
5dbbdd2
Fixed leak from trim().
mythrocks Dec 5, 2023
d00747f
Document onTaskCompletion.
mythrocks Dec 5, 2023
b5bd065
Optimization: Skip window kernel if no output for current batch.
mythrocks Dec 5, 2023
94d9bc4
Removed commented code, prints.
mythrocks Dec 5, 2023
60245fe
Switched to exposing negative minPreceding.
mythrocks Dec 5, 2023
682afdc
Removed incorrect error message.
mythrocks Dec 6, 2023
4767080
Tests for varying finite window combinations.
mythrocks Dec 6, 2023
fa5ab16
Tests for unpartitioned cases.
mythrocks Dec 6, 2023
3f50224
Fixed leak in concat.
mythrocks Dec 6, 2023
e08cfa2
Test that large extents fall back to GpuWindowExec.
mythrocks Dec 6, 2023
e0581c0
Fix build break with Scala 2.13.
mythrocks Dec 6, 2023
0ba8be6
Support for negative offsets.
mythrocks Dec 7, 2023
b5fda09
Removed erroneous batching.
mythrocks Dec 7, 2023
f9de13a
Config changes:
mythrocks Dec 7, 2023
31896e3
Merge remote-tracking branch 'origin/branch-24.02' into batched-bound…
mythrocks Dec 7, 2023
4a5f5c5
Docs update for batched row window config.
mythrocks Dec 7, 2023
4246cbb
Merge remote-tracking branch 'origin/branch-24.02' into batched-bound…
mythrocks Dec 12, 2023
5d475ca
Fixed output column order. This fixes the empty output problem.
mythrocks Dec 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions integration_tests/src/main/python/window_function_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1807,6 +1807,72 @@ def test_window_aggs_for_negative_rows_unpartitioned(data_gen, batch_size):
conf=conf)


@ignore_order(local=True)
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn)
@pytest.mark.parametrize('data_gen', [
_grpkey_short_with_nulls,
_grpkey_int_with_nulls,
_grpkey_long_with_nulls,
_grpkey_date_with_nulls,
], ids=idfn)
def test_window_aggs_for_batched_finite_row_windows_partitioned(data_gen, batch_size):
conf = {'spark.rapids.sql.batchSizeBytes': batch_size}
assert_gpu_and_cpu_are_equal_sql(
lambda spark: gen_df(spark, data_gen, length=2048),
'window_agg_table',
"""
SELECT
COUNT(1) OVER (PARTITION BY a ORDER BY b,c ASC
ROWS BETWEEN CURRENT ROW AND 100 FOLLOWING) AS count_1_asc,
COUNT(c) OVER (PARTITION BY a ORDER BY b,c ASC
ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) AS count_b_asc,
SUM(c) OVER (PARTITION BY a ORDER BY b,c ASC
ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) AS sum_c_asc,
AVG(c) OVER (PARTITION BY a ORDER BY b,c ASC
ROWS BETWEEN 10 PRECEDING AND 30 FOLLOWING) AS avg_b_asc,
MAX(c) OVER (PARTITION BY a ORDER BY b,c DESC
ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) AS max_b_desc,
MIN(c) OVER (PARTITION BY a ORDER BY b,c ASC
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) AS min_b_asc
FROM window_agg_table
""",
validate_execs_in_gpu_plan=['GpuBatchedBoundedWindowExec'],
conf=conf)


@ignore_order(local=True)
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn)
@pytest.mark.parametrize('data_gen', [
_grpkey_short_with_nulls,
_grpkey_int_with_nulls,
_grpkey_long_with_nulls,
_grpkey_date_with_nulls,
], ids=idfn)
def test_window_aggs_for_batched_finite_row_windows_unpartitioned(data_gen, batch_size):
conf = {'spark.rapids.sql.batchSizeBytes': batch_size}
assert_gpu_and_cpu_are_equal_sql(
lambda spark: gen_df(spark, data_gen, length=2048),
'window_agg_table',
"""
SELECT
COUNT(1) OVER (ORDER BY b,c,a ASC
ROWS BETWEEN CURRENT ROW AND 100 FOLLOWING) AS count_1_asc,
COUNT(c) OVER (PARTITION BY a ORDER BY b,c,a ASC
ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) AS count_b_asc,
SUM(c) OVER (PARTITION BY a ORDER BY b,c,a ASC
ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) AS sum_c_asc,
AVG(c) OVER (PARTITION BY a ORDER BY b,c,a ASC
ROWS BETWEEN 10 PRECEDING AND 30 FOLLOWING) AS avg_b_asc,
MAX(c) OVER (PARTITION BY a ORDER BY b,c,a DESC
ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) AS max_b_desc,
MIN(c) OVER (PARTITION BY a ORDER BY b,c,a ASC
ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) AS min_b_asc
FROM window_agg_table
""",
validate_execs_in_gpu_plan=['GpuBatchedBoundedWindowExec'],
conf=conf)


def test_lru_cache_datagen():
# log cache info at the end of integration tests, not related to window functions
info = gen_df_help.cache_info()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids

import ai.rapids.cudf.{ColumnVector => CudfColumnVector, NvtxColor, Table => CudfTable}
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression, SortOrder}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch

class GpuBatchedBoundedWindowIterator(
input: Iterator[ColumnarBatch],
override val boundWindowOps: Seq[GpuExpression],
override val boundPartitionSpec: Seq[GpuExpression],
override val boundOrderSpec: Seq[SortOrder],
val outputTypes: Array[DataType],
minPreceding: Int,
maxFollowing: Int,
numOutputBatches: GpuMetric,
numOutputRows: GpuMetric,
opTime: GpuMetric) extends Iterator[ColumnarBatch] with BasicWindowCalc with Logging {

override def isRunningBatched: Boolean = false // Not "Running Window" optimized.
// This is strictly for batching.

override def hasNext: Boolean = numUnprocessedInCache > 0 || input.hasNext

var cached: Option[Array[CudfColumnVector]] = None // For processing with the next batch.

private var numUnprocessedInCache: Int = 0 // numRows at the bottom not processed completely.
private var numPrecedingRowsAdded: Int = 0 // numRows at the top, added for preceding context.

// Register handler to clean up cache when task completes.
Option(TaskContext.get()).foreach { tc =>
onTaskCompletion(tc) {
clearCached()
}
}

// Caches input column schema on first read.
var inputTypes: Option[Array[DataType]] = None

// Clears cached column vectors, after consumption.
private def clearCached(): Unit = {
cached.foreach(_.foreach(_.close))
cached = None
}

private def getNextInputBatch: SpillableColumnarBatch = {
// Sets column batch types using the types cached from the
// first input column read.
def optionallySetInputTypes(inputCB: ColumnarBatch): Unit = {
if (inputTypes.isEmpty) {
inputTypes = Some(GpuColumnVector.extractTypes(inputCB))
}
}

// Reads fresh batch from iterator, initializes input data-types if necessary.
def getFreshInputBatch: ColumnarBatch = {
val fresh_batch = input.next()
optionallySetInputTypes(fresh_batch)
fresh_batch
}

def concatenateColumns(cached: Array[CudfColumnVector],
freshBatchTable: CudfTable)
: Array[CudfColumnVector] = {

if (cached.length != freshBatchTable.getNumberOfColumns) {
throw new IllegalArgumentException("Expected the same number of columns " +
"in input batch and cached batch.")
}
cached.zipWithIndex.map { case (cachedCol, idx) =>
CudfColumnVector.concatenate(cachedCol, freshBatchTable.getColumn(idx))
}
}

// Either cached has unprocessed rows, or input.hasNext().
if (input.hasNext) {
if (cached.isDefined) {
// Cached input AND new input rows exist. Return concat-ed rows.
withResource(getFreshInputBatch) { freshBatchCB =>
withResource(GpuColumnVector.from(freshBatchCB)) { freshBatchTable =>
val concat = concatenateColumns(cached.get, freshBatchTable)
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
clearCached()
SpillableColumnarBatch(convertToBatch(inputTypes.get, concat),
SpillPriorities.ACTIVE_BATCHING_PRIORITY)
}
}
} else {
// No cached input available. Return fresh input rows, only.
SpillableColumnarBatch(getFreshInputBatch,
SpillPriorities.ACTIVE_BATCHING_PRIORITY)
}
}
else {
// No fresh input available. Return cached input.
val cachedCB = convertToBatch(inputTypes.get, cached.get)
clearCached()
SpillableColumnarBatch(cachedCB,
SpillPriorities.ACTIVE_BATCHING_PRIORITY)
}
}

/**
* Helper to trim specified number of rows off the top and bottom,
* of all specified columns.
*/
private def trim(columns: Array[CudfColumnVector],
offTheTop: Int,
offTheBottom: Int): Array[CudfColumnVector] = {

def checkValidSizes(col: CudfColumnVector): Unit =
if ((offTheTop + offTheBottom) > col.getRowCount) {
throw new IllegalArgumentException(s"Cannot trim column of size ${col.getRowCount} by " +
s"$offTheTop rows at the top, and $offTheBottom rows at the bottom.")
}

columns.map{ col =>
checkValidSizes(col)
col.subVector(offTheTop, col.getRowCount.toInt - offTheBottom)
}
}

private def resetInputCache(newCache: Option[Array[CudfColumnVector]],
newPrecedingAdded: Int): Unit= {
cached.foreach(_.foreach(_.close))
cached = newCache
numPrecedingRowsAdded = newPrecedingAdded
}

override def next(): ColumnarBatch = {
var outputBatch: ColumnarBatch = null
while (outputBatch == null && hasNext) {
withResource(getNextInputBatch) { inputCbSpillable =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to have withRetry put in here somewhere. The hard part is making sure that we can roll back any of the caching.

We can calculate/get the inputRowCount, noMoreInput and numUnprocessedInCache without needing to get the input batch from inputCbSpillable so that might make it simpler to add in the retry logic.

I am fine if this is a follow on issue, but we need it fixed at some point.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll have to address this in a follow-up. I'm still trying to sort out the missing rows problem.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#10046 will address the withRetry part of the problem.

withResource(inputCbSpillable.getColumnarBatch()) { inputCB =>

val inputRowCount = inputCB.numRows()
val noMoreInput = !input.hasNext
numUnprocessedInCache = if (noMoreInput) {
// If there are no more input rows expected,
// this is the last output batch.
// Consider all rows in the batch as processed.
0
} else {
// More input rows expected. The last `maxFollowing` rows can't be finalized.
// Cannot exceed `inputRowCount`.
// TODO: Account for maxFollowing < 0 (e.g. LAG()) => numUnprocessedInCache = 0.
maxFollowing min inputRowCount
}

if (numPrecedingRowsAdded + numUnprocessedInCache >= inputRowCount) {
// No point calling windowing kernel: the results will simply be ignored.
logWarning("Not enough rows! Cannot output a batch.")
} else {
withResource(new NvtxWithMetrics("window", NvtxColor.CYAN, opTime)) { _ =>
withResource(computeBasicWindow(inputCB)) { outputCols =>
outputBatch = withResource(
trim(outputCols,
numPrecedingRowsAdded, numUnprocessedInCache)) { trimmed =>
convertToBatch(outputTypes, trimmed)
}
}
}
}

// Compute new cache using current input.
// TODO: Account for minPreceding >0 (e.g. LEAD()) => numPrecedingRowsAdded = 0.
numPrecedingRowsAdded = Math.abs(minPreceding) min (inputRowCount - numUnprocessedInCache)
val inputCols = Range(0, inputCB.numCols()).map {
inputCB.column(_).asInstanceOf[GpuColumnVector].getBase
}.toArray

val newCached = trim(inputCols,
inputRowCount - (numPrecedingRowsAdded + numUnprocessedInCache),
0)
resetInputCache(Some(newCached), numPrecedingRowsAdded)
}
}
}
numOutputBatches += 1
numOutputRows += outputBatch.numRows()
outputBatch
}
}

/// Window Exec used exclusively for batching bounded window functions.
class GpuBatchedBoundedWindowExec(
override val windowOps: Seq[NamedExpression],
override val gpuPartitionSpec: Seq[Expression],
override val gpuOrderSpec: Seq[SortOrder],
override val child: SparkPlan)(
override val cpuPartitionSpec: Seq[Expression],
override val cpuOrderSpec: Seq[SortOrder],
minPreceding: Integer,
maxFollowing: Integer
) extends GpuWindowExec(windowOps,
gpuPartitionSpec,
gpuOrderSpec,
child)(cpuPartitionSpec, cpuOrderSpec) {

override def otherCopyArgs: Seq[AnyRef] =
cpuPartitionSpec :: cpuOrderSpec :: minPreceding :: maxFollowing :: Nil

override def childrenCoalesceGoal: Seq[CoalesceGoal] = Seq(outputBatching)
mythrocks marked this conversation as resolved.
Show resolved Hide resolved

override def outputBatching: CoalesceGoal = if (gpuPartitionSpec.isEmpty) {
RequireSingleBatch
} else {
BatchedByKey(gpuPartitionOrdering)(cpuPartitionOrdering)
}

override protected def internalDoExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputBatches = gpuLongMetric(GpuMetric.NUM_OUTPUT_BATCHES)
val numOutputRows = gpuLongMetric(GpuMetric.NUM_OUTPUT_ROWS)
val opTime = gpuLongMetric(GpuMetric.OP_TIME)

val boundWindowOps = GpuBindReferences.bindGpuReferences(windowOps, child.output)
val boundPartitionSpec = GpuBindReferences.bindGpuReferences(gpuPartitionSpec, child.output)
val boundOrderSpec = GpuBindReferences.bindReferences(gpuOrderSpec, child.output)

child.executeColumnar().mapPartitions { iter =>
new GpuBatchedBoundedWindowIterator(iter, boundWindowOps, boundPartitionSpec,
boundOrderSpec, output.map(_.dataType).toArray, minPreceding, maxFollowing,
numOutputBatches, numOutputRows, opTime)
}
}
}
Loading
Loading