Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PythonRunner Changes [databricks] #10274

Merged
merged 5 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions integration_tests/src/main/python/spark_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ def is_spark_330_or_later():
def is_spark_340_or_later():
return spark_version() >= "3.4.0"

def is_spark_341():
return spark_version() == "3.4.1"

def is_spark_350_or_later():
return spark_version() >= "3.5.0"

Expand Down
17 changes: 7 additions & 10 deletions integration_tests/src/main/python/udf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import pytest

from conftest import is_at_least_precommit_run, is_not_utc
from spark_session import is_databricks_runtime, is_before_spark_330, is_before_spark_350, is_spark_340_or_later
from spark_session import is_databricks_runtime, is_before_spark_330, is_before_spark_350, is_spark_341

from pyspark.sql.pandas.utils import require_minimum_pyarrow_version, require_minimum_pandas_version

Expand Down Expand Up @@ -43,12 +43,6 @@
import pyarrow
from typing import Iterator, Tuple


if is_databricks_runtime() and is_spark_340_or_later():
# Databricks 13.3 does not use separate reader/writer threads for Python UDFs
# which can lead to hangs. Skipping these tests until the Python UDF handling is updated.
pytestmark = pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/9493")

arrow_udf_conf = {
'spark.sql.execution.arrow.pyspark.enabled': 'true',
'spark.rapids.sql.exec.WindowInPandasExec': 'true',
Expand Down Expand Up @@ -182,7 +176,10 @@ def group_size_udf(to_process: pd.Series) -> int:

low_upper_win = Window.partitionBy('a').orderBy('b').rowsBetween(-3, 3)

udf_windows = [no_part_win, unbounded_win, cur_follow_win, pre_cur_win, low_upper_win]
running_win_param = pytest.param(pre_cur_win, marks=pytest.mark.xfail(
condition=is_databricks_runtime() and is_spark_341(),
reason='DB13.3 wrongly uses RunningWindowFunctionExec to evaluate a PythonUDAF and it will fail even on CPU'))
udf_windows = [no_part_win, unbounded_win, cur_follow_win, running_win_param, low_upper_win]
window_ids = ['No_Partition', 'Unbounded', 'Unbounded_Following', 'Unbounded_Preceding',
'Lower_Upper']

Expand Down Expand Up @@ -338,8 +335,8 @@ def create_df(spark, data_gen, left_length, right_length):
@ignore_order
@pytest.mark.parametrize('data_gen', [ShortGen(nullable=False)], ids=idfn)
def test_cogroup_apply_udf(data_gen):
def asof_join(l, r):
return pd.merge_asof(l, r, on='a', by='b')
def asof_join(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame:
return pd.merge_ordered(left, right)

def do_it(spark):
left, right = create_df(spark, data_gen, 500, 500)
Expand Down
10 changes: 8 additions & 2 deletions jenkins/databricks/build.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash
#
# Copyright (c) 2020-2023, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2020-2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -52,7 +52,13 @@ declare -A artifacts
initialize()
{
# install rsync to be used for copying onto the databricks nodes
sudo apt install -y maven rsync
sudo apt install -y rsync

if [[ ! -d $HOME/apache-maven-3.6.3 ]]; then
wget https://archive.apache.org/dist/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz -P /tmp
tar xf /tmp/apache-maven-3.6.3-bin.tar.gz -C $HOME
sudo ln -s $HOME/apache-maven-3.6.3/bin/mvn /usr/local/bin/mvn
fi

# Archive file location of the plugin repository
SPARKSRCTGZ=${SPARKSRCTGZ:-''}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ private static StructType structFromTypes(DataType[] format) {
return new StructType(fields);
}

private static StructType structFromAttributes(List<Attribute> format) {
public static StructType structFromAttributes(List<Attribute> format) {
StructField[] fields = new StructField[format.size()];
int i = 0;
for (Attribute attribute: format) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@ package com.nvidia.spark.rapids

import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.Table
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingSeq

import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, MapType, StructType}

/**
* Utility class with methods for calculating various metrics about GPU memory usage
* prior to allocation.
* prior to allocation, along with some operations with batches.
*/
object GpuBatchUtils {

Expand Down Expand Up @@ -175,4 +179,37 @@ object GpuBatchUtils {
bytes
}
}

/**
* Concatenate the input batches into a single one.
* The caller is responsible for closing the returned batch.
*
* @param spillBatches the batches to be concatenated, will be closed after the call
* returns.
* @return the concatenated SpillableColumnarBatch or None if the input is empty.
*/
def concatSpillBatchesAndClose(
spillBatches: Seq[SpillableColumnarBatch]): Option[SpillableColumnarBatch] = {
val retBatch = if (spillBatches.length >= 2) {
// two or more batches, concatenate them
val (concatTable, types) = RmmRapidsRetryIterator.withRetryNoSplit(spillBatches) { _ =>
withResource(spillBatches.safeMap(_.getColumnarBatch())) { batches =>
val batchTypes = GpuColumnVector.extractTypes(batches.head)
withResource(batches.safeMap(GpuColumnVector.from)) { tables =>
(Table.concatenate(tables: _*), batchTypes)
}
}
}
// Make the concatenated table spillable.
withResource(concatTable) { _ =>
SpillableColumnarBatch(GpuColumnVector.from(concatTable, types),
SpillPriorities.ACTIVE_BATCHING_PRIORITY)
}
} else if (spillBatches.length == 1) {
// only one batch
spillBatches.head
} else null

Option(retBatch)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ package org.apache.spark.sql.rapids.execution
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.Table
import com.nvidia.spark.rapids.{GpuColumnVector, GpuExpression, GpuHashPartitioningBase, GpuMetric, RmmRapidsRetryIterator, SpillableColumnarBatch, SpillPriorities, TaskAutoCloseableResource}
import com.nvidia.spark.rapids.{GpuBatchUtils, GpuColumnVector, GpuExpression, GpuHashPartitioningBase, GpuMetric, SpillableColumnarBatch, SpillPriorities, TaskAutoCloseableResource}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.RapidsPluginImplicits._

Expand All @@ -41,27 +40,7 @@ object GpuSubPartitionHashJoin {
*/
def concatSpillBatchesAndClose(
spillBatches: Seq[SpillableColumnarBatch]): Option[SpillableColumnarBatch] = {
val retBatch = if (spillBatches.length >= 2) {
// two or more batches, concatenate them
val (concatTable, types) = RmmRapidsRetryIterator.withRetryNoSplit(spillBatches) { _ =>
withResource(spillBatches.safeMap(_.getColumnarBatch())) { batches =>
val batchTypes = GpuColumnVector.extractTypes(batches.head)
withResource(batches.safeMap(GpuColumnVector.from)) { tables =>
(Table.concatenate(tables: _*), batchTypes)
}
}
}
// Make the concatenated table spillable.
withResource(concatTable) { _ =>
SpillableColumnarBatch(GpuColumnVector.from(concatTable, types),
SpillPriorities.ACTIVE_BATCHING_PRIORITY)
}
} else if (spillBatches.length == 1) {
// only one batch
spillBatches.head
} else null

Option(retBatch)
GpuBatchUtils.concatSpillBatchesAndClose(spillBatches)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ import ai.rapids.cudf
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.withRetryNoSplit
import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion

import org.apache.spark.TaskContext
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.rapids.execution.python.shims.GpuPythonArrowOutput
import org.apache.spark.sql.rapids.execution.python.shims.GpuBasePythonRunner
import org.apache.spark.sql.rapids.shims.DataTypeUtilsShim
import org.apache.spark.sql.vectorized.ColumnarBatch

Expand Down Expand Up @@ -195,7 +196,7 @@ private[python] object BatchGroupUtils {
def executePython[IN](
pyInputIterator: Iterator[IN],
output: Seq[Attribute],
pyRunner: GpuPythonRunnerBase[IN],
pyRunner: GpuBasePythonRunner[IN],
outputRows: GpuMetric,
outputBatches: GpuMetric): Iterator[ColumnarBatch] = {
val context = TaskContext.get()
Expand Down Expand Up @@ -394,38 +395,72 @@ private[python] object BatchGroupedIterator {
class CombiningIterator(
inputBatchQueue: BatchQueue,
pythonOutputIter: Iterator[ColumnarBatch],
pythonArrowReader: GpuPythonArrowOutput,
pythonArrowReader: GpuArrowOutput,
numOutputRows: GpuMetric,
numOutputBatches: GpuMetric) extends Iterator[ColumnarBatch] {

// For `hasNext` we are waiting on the queue to have something inserted into it
// instead of waiting for a result to be ready from Python. The reason for this
// is to let us know the target number of rows in the batch that we want when reading.
// It is a bit hacked up but it works. In the future when we support spilling we should
// store the number of rows separate from the batch. That way we can get the target batch
// size out without needing to grab the GpuSemaphore which we cannot do if we might block
// on a read operation.
override def hasNext: Boolean = inputBatchQueue.hasNext || pythonOutputIter.hasNext
// This is only for the input.
private var pendingInput: Option[SpillableColumnarBatch] = None
Option(TaskContext.get()).foreach(onTaskCompletion(_)(pendingInput.foreach(_.close())))

// The Python output should line up row for row so we only look at the Python output
// iterator and no need to check the `inputPending` who will be consumed when draining
// the Python output.
override def hasNext: Boolean = pythonOutputIter.hasNext

override def next(): ColumnarBatch = {
val numRows = inputBatchQueue.peekBatchSize
val numRows = inputBatchQueue.peekBatchNumRows()
// Updates the expected batch size for next read
pythonArrowReader.setMinReadTargetBatchSize(numRows)
pythonArrowReader.setMinReadTargetNumRows(numRows)
// Reads next batch from Python and combines it with the input batch by the left side.
withResource(pythonOutputIter.next()) { cbFromPython =>
assert(cbFromPython.numRows() == numRows)
withResource(inputBatchQueue.remove()) { origBatch =>
// Here may get a batch has a larger rows number than the current input batch.
assert(cbFromPython.numRows() >= numRows,
s"Expects >=$numRows rows but got ${cbFromPython.numRows()} from the Python worker")
withResource(concatInputBatch(cbFromPython.numRows())) { concated =>
numOutputBatches += 1
numOutputRows += numRows
combine(origBatch, cbFromPython)
GpuColumnVector.combineColumns(concated, cbFromPython)
}
}
}

private def combine(lBatch: ColumnarBatch, rBatch: ColumnarBatch): ColumnarBatch = {
val lColumns = GpuColumnVector.extractColumns(lBatch).map(_.incRefCount())
val rColumns = GpuColumnVector.extractColumns(rBatch).map(_.incRefCount())
new ColumnarBatch(lColumns ++ rColumns, lBatch.numRows())
private def concatInputBatch(targetNumRows: Int): ColumnarBatch = {
withResource(mutable.ArrayBuffer[SpillableColumnarBatch]()) { buf =>
var curNumRows = pendingInput.map(_.numRows()).getOrElse(0)
pendingInput.foreach(buf.append(_))
pendingInput = None
while (curNumRows < targetNumRows) {
val scb = inputBatchQueue.remove()
if (scb != null) {
buf.append(scb)
curNumRows = curNumRows + scb.numRows()
}
}
assert(buf.nonEmpty, "The input queue is empty")

if (curNumRows > targetNumRows) {
// Need to split the last batch
val Array(first, second) = withRetryNoSplit(buf.remove(buf.size - 1)) { lastScb =>
val splitIdx = lastScb.numRows() - (curNumRows - targetNumRows)
withResource(lastScb.getColumnarBatch()) { lastCb =>
val batchTypes = GpuColumnVector.extractTypes(lastCb)
withResource(GpuColumnVector.from(lastCb)) { table =>
table.contiguousSplit(splitIdx).safeMap(
SpillableColumnarBatch(_, batchTypes, SpillPriorities.ACTIVE_ON_DECK_PRIORITY))
}
}
}
buf.append(first)
pendingInput = Some(second)
}

val ret = GpuBatchUtils.concatSpillBatchesAndClose(buf.toSeq)
// "ret" should be non empty because we checked the buf is not empty ahead.
withResource(ret.get) { concatedScb =>
concatedScb.getColumnarBatch()
}
} // end of withResource(mutable.ArrayBuffer)
}

}
Expand Down Expand Up @@ -560,3 +595,4 @@ class CoGroupedIterator(
keyOrdering.compare(leftKeyRow, rightKeyRow)
}
}

Loading
Loading