Skip to content

Commit

Permalink
Build python output schema from udf expressions (#1794)
Browse files Browse the repository at this point in the history
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
  • Loading branch information
firestarman authored Feb 24, 2021
1 parent fe02e25 commit d2b6bfc
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 6 deletions.
4 changes: 1 addition & 3 deletions integration_tests/src/main/python/udf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import pytest

from conftest import is_at_least_precommit_run, is_databricks_runtime
from conftest import is_at_least_precommit_run

from pyspark.sql.pandas.utils import require_minimum_pyarrow_version, require_minimum_pandas_version
try:
Expand Down Expand Up @@ -170,8 +170,6 @@ def pandas_sum(to_process: pd.Series) -> int:
conf=arrow_udf_conf)


@pytest.mark.xfail(condition=is_databricks_runtime(),
reason='https://github.com/NVIDIA/spark-rapids/issues/1644')
@ignore_order
@pytest.mark.parametrize('data_gen', [byte_gen, short_gen, int_gen], ids=idfn)
@pytest.mark.parametrize('window', udf_windows, ids=window_ids)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,12 @@ case class GpuArrowEvalPythonExec(

// cache in a local to avoid serializing the plan
val inputSchema = child.output.toStructType
val pythonOutputSchema = StructType.fromAttributes(resultAttrs)
// Build the Python output schema from UDF expressions instead of the 'resultAttrs', because
// the 'resultAttrs' is NOT always equal to the Python output schema. For example,
// On Databricks when projecting only one column from a Python UDF output where containing
// multiple result columns, there will be only one attribute in the 'resultAttrs' for the
// projecting output, but the output schema for this Python UDF contains multiple columns.
val pythonOutputSchema = StructType.fromAttributes(udfs.map(_.resultAttribute))

val childOutput = child.output
val targetBatchSize = batchSize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,8 +490,14 @@ trait GpuWindowInPandasExecBase extends UnaryExecNode with GpuExec {

lazy val isPythonOnGpuEnabled = GpuPythonHelper.isPythonOnGpuEnabled(conf, pythonModuleKey)
// cache in a local to avoid serializing the plan
val retAttributes = windowExpression.map(_.asInstanceOf[NamedExpression].toAttribute)
val pythonOutputSchema = StructType.fromAttributes(retAttributes)

// Build the Python output schema from UDF expressions instead of the 'windowExpression',
// because the 'windowExpression' does NOT always represent the Python output schema.
// For example, on Databricks when projecting only one column from a Python UDF output
// where containing multiple result columns, there will be only one item in the
// 'windowExpression' for the projecting output, but the output schema for this Python
// UDF contains multiple columns.
val pythonOutputSchema = StructType.fromAttributes(udfExpressions.map(_.resultAttribute))
val childOutput = child.output

// 8) Start processing.
Expand Down

0 comments on commit d2b6bfc

Please sign in to comment.