diff --git a/integration_tests/src/main/python/udf_test.py b/integration_tests/src/main/python/udf_test.py index c5717a676a6..dbccee4a374 100644 --- a/integration_tests/src/main/python/udf_test.py +++ b/integration_tests/src/main/python/udf_test.py @@ -13,7 +13,6 @@ # limitations under the License. import pytest -from pyspark import BarrierTaskContext, TaskContext from conftest import is_at_least_precommit_run from spark_session import is_databricks_runtime, is_before_spark_330, is_before_spark_350, is_spark_341 @@ -34,7 +33,7 @@ raise AssertionError("incorrect pyarrow version during required testing " + str(e)) pytestmark = pytest.mark.skip(reason=str(e)) -from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect, assert_equal +from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect from data_gen import * from marks import approximate_float, allow_non_gpu, ignore_order from pyspark.sql import Window @@ -426,41 +425,3 @@ def test_func(spark): lambda data: [pd.DataFrame([len(list(data))])], schema="ret:integer") assert_gpu_and_cpu_are_equal_collect(test_func, conf=arrow_udf_conf) - - -@pytest.mark.skipif(is_before_spark_350(), - reason='mapInPandas with barrier mode is introduced by Pyspark 3.5.0') -@pytest.mark.parametrize('is_barrier', [True, False], ids=idfn) -def test_map_in_pandas_with_barrier_mode(is_barrier): - def func(iterator): - tc = TaskContext.get() - assert tc is not None - if is_barrier: - assert isinstance(tc, BarrierTaskContext) - else: - assert not isinstance(tc, BarrierTaskContext) - - for batch in iterator: - yield batch - - assert_gpu_and_cpu_are_equal_collect( - lambda spark: spark.range(0, 10, 1, 1).mapInPandas(func, "id long", is_barrier)) - - -@pytest.mark.skipif(is_before_spark_350(), - reason='mapInArrow with barrier mode is introduced by Pyspark 3.5.0') -@pytest.mark.parametrize('is_barrier', [True, False], ids=idfn) -def test_map_in_arrow_with_barrier_mode(is_barrier): - def func(iterator): - tc = TaskContext.get() - assert tc is not None - if is_barrier: - assert isinstance(tc, BarrierTaskContext) - else: - assert not isinstance(tc, BarrierTaskContext) - - for batch in iterator: - yield batch - - assert_gpu_and_cpu_are_equal_collect( - lambda spark: spark.range(0, 10, 1, 1).mapInArrow(func, "id long", is_barrier)) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 64ac9808c61..050eae46a07 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -67,7 +67,7 @@ import org.apache.spark.sql.rapids.catalyst.expressions.GpuRand import org.apache.spark.sql.rapids.execution._ import org.apache.spark.sql.rapids.execution.python._ import org.apache.spark.sql.rapids.execution.python.GpuFlatMapGroupsInPandasExecMeta -import org.apache.spark.sql.rapids.shims.{GpuAscii, GpuMapInPandasExecMeta, GpuTimeAdd} +import org.apache.spark.sql.rapids.shims.{GpuAscii, GpuTimeAdd} import org.apache.spark.sql.rapids.zorder.ZOrderRules import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInBatchExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInBatchExec.scala index 35d166dbc5f..0199c52250f 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInBatchExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInBatchExec.scala @@ -44,8 +44,6 @@ trait GpuMapInBatchExec extends ShimUnaryExecNode with GpuPythonExecBase { protected val func: Expression protected val pythonEvalType: Int - protected val isBarrier: Boolean - private val pandasFunction = func.asInstanceOf[GpuPythonUDF].func override def producedAttributes: AttributeSet = AttributeSet(output) @@ -67,7 +65,7 @@ trait GpuMapInBatchExec extends ShimUnaryExecNode with GpuPythonExecBase { val localEvalType = pythonEvalType // Start process - val func = (inputIter: Iterator[ColumnarBatch]) => { + child.executeColumnar().mapPartitionsInternal { inputIter => val context = TaskContext.get() // Single function with one struct. @@ -111,14 +109,7 @@ trait GpuMapInBatchExec extends ShimUnaryExecNode with GpuPythonExecBase { numOutputRows += cb.numRows cb } - } // end of func - - if (isBarrier) { - child.executeColumnar().barrier().mapPartitions(func) - } else { - child.executeColumnar().mapPartitionsInternal(func) - } - + } // end of mapPartitionsInternal } // end of internalDoExecuteColumnar } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInPandasExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInPandasExec.scala index a678f1336f0..aaca88c38a0 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInPandasExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInPandasExec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2024, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,9 +21,9 @@ import com.nvidia.spark.rapids._ import org.apache.spark.api.python.PythonEvalType import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, PythonUDF} import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.python.MapInPandasExec +import org.apache.spark.sql.execution.python._ -class GpuMapInPandasExecMetaBase( +class GpuMapInPandasExecMeta( mapPandas: MapInPandasExec, conf: RapidsConf, parent: Option[RapidsMeta[_, _, _]], @@ -34,9 +34,9 @@ class GpuMapInPandasExecMetaBase( override def noReplacementPossibleMessage(reasons: String): String = s"cannot run even partially on the GPU because $reasons" - protected val udf: BaseExprMeta[PythonUDF] = GpuOverrides.wrapExpr( + private val udf: BaseExprMeta[PythonUDF] = GpuOverrides.wrapExpr( mapPandas.func.asInstanceOf[PythonUDF], conf, Some(this)) - protected val resultAttrs: Seq[BaseExprMeta[Attribute]] = + private val resultAttrs: Seq[BaseExprMeta[Attribute]] = mapPandas.output.map(GpuOverrides.wrapExpr(_, conf, Some(this))) override val childExprs: Seq[BaseExprMeta[_]] = resultAttrs :+ udf @@ -45,8 +45,7 @@ class GpuMapInPandasExecMetaBase( GpuMapInPandasExec( udf.convertToGpu(), resultAttrs.map(_.convertToGpu()).asInstanceOf[Seq[Attribute]], - childPlans.head.convertIfNeeded(), - isBarrier = false, + childPlans.head.convertIfNeeded() ) } @@ -61,8 +60,7 @@ class GpuMapInPandasExecMetaBase( case class GpuMapInPandasExec( func: Expression, output: Seq[Attribute], - child: SparkPlan, - override val isBarrier: Boolean) extends GpuMapInBatchExec { + child: SparkPlan) extends GpuMapInBatchExec { override protected val pythonEvalType: Int = PythonEvalType.SQL_MAP_PANDAS_ITER_UDF } diff --git a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/shims/GpuMapInPandasExecMeta.scala b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/shims/GpuMapInPandasExecMeta.scala deleted file mode 100644 index a0fb7353581..00000000000 --- a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/shims/GpuMapInPandasExecMeta.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright (c) 2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.rapids.shims - -import com.nvidia.spark.rapids._ - -import org.apache.spark.sql.execution.python.MapInPandasExec -import org.apache.spark.sql.rapids.execution.python.GpuMapInPandasExecMetaBase - -/*** spark-rapids-shim-json-lines -{"spark": "311"} -{"spark": "312"} -{"spark": "313"} -{"spark": "320"} -{"spark": "321"} -{"spark": "321cdh"} -{"spark": "322"} -{"spark": "323"} -{"spark": "324"} -{"spark": "330"} -{"spark": "330cdh"} -{"spark": "331"} -{"spark": "332"} -{"spark": "332cdh"} -{"spark": "333"} -{"spark": "334"} -{"spark": "340"} -{"spark": "341"} -{"spark": "342"} -spark-rapids-shim-json-lines ***/ -class GpuMapInPandasExecMeta( - mapPandas: MapInPandasExec, - conf: RapidsConf, - parent: Option[RapidsMeta[_, _, _]], - rule: DataFromReplacementRule) - extends GpuMapInPandasExecMetaBase(mapPandas, conf, parent, rule) { - -} diff --git a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/PythonMapInArrowExecShims.scala b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/PythonMapInArrowExecShims.scala index 79425c47e00..958f9fc89fd 100644 --- a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/PythonMapInArrowExecShims.scala +++ b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/PythonMapInArrowExecShims.scala @@ -35,7 +35,7 @@ import com.nvidia.spark.rapids._ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.python.PythonMapInArrowExec -import org.apache.spark.sql.rapids.shims.GpuPythonMapInArrowExecMeta +import org.apache.spark.sql.rapids.execution.python.GpuPythonMapInArrowExecMeta object PythonMapInArrowExecShims { diff --git a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/GpuPythonMapInArrowExec.scala b/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/GpuPythonMapInArrowExec.scala index 5eca8b18294..6d06a97db06 100644 --- a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/GpuPythonMapInArrowExec.scala +++ b/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/GpuPythonMapInArrowExec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. + * Copyright (c) 2022-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,17 +31,16 @@ {"spark": "350"} {"spark": "351"} spark-rapids-shim-json-lines ***/ -package org.apache.spark.sql.rapids.shims +package org.apache.spark.sql.rapids.execution.python import com.nvidia.spark.rapids._ import org.apache.spark.api.python.PythonEvalType import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, PythonUDF} import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.python.PythonMapInArrowExec -import org.apache.spark.sql.rapids.execution.python.GpuMapInBatchExec +import org.apache.spark.sql.execution.python._ -class GpuPythonMapInArrowExecMetaBase( +class GpuPythonMapInArrowExecMeta( mapArrow: PythonMapInArrowExec, conf: RapidsConf, parent: Option[RapidsMeta[_, _, _]], @@ -52,9 +51,9 @@ class GpuPythonMapInArrowExecMetaBase( override def noReplacementPossibleMessage(reasons: String): String = s"cannot run even partially on the GPU because $reasons" - protected val udf: BaseExprMeta[PythonUDF] = GpuOverrides.wrapExpr( + private val udf: BaseExprMeta[PythonUDF] = GpuOverrides.wrapExpr( mapArrow.func.asInstanceOf[PythonUDF], conf, Some(this)) - protected val resultAttrs: Seq[BaseExprMeta[Attribute]] = + private val resultAttrs: Seq[BaseExprMeta[Attribute]] = mapArrow.output.map(GpuOverrides.wrapExpr(_, conf, Some(this))) override val childExprs: Seq[BaseExprMeta[_]] = resultAttrs :+ udf @@ -63,8 +62,7 @@ class GpuPythonMapInArrowExecMetaBase( GpuPythonMapInArrowExec( udf.convertToGpu(), resultAttrs.map(_.convertToGpu()).asInstanceOf[Seq[Attribute]], - childPlans.head.convertIfNeeded(), - isBarrier = false, + childPlans.head.convertIfNeeded() ) } @@ -79,8 +77,7 @@ class GpuPythonMapInArrowExecMetaBase( case class GpuPythonMapInArrowExec( func: Expression, output: Seq[Attribute], - child: SparkPlan, - override val isBarrier: Boolean) extends GpuMapInBatchExec { + child: SparkPlan) extends GpuMapInBatchExec { override protected val pythonEvalType: Int = PythonEvalType.SQL_MAP_ARROW_ITER_UDF } diff --git a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/GpuPythonMapInArrowExecMeta.scala b/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/GpuPythonMapInArrowExecMeta.scala deleted file mode 100644 index e9d711315a9..00000000000 --- a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/GpuPythonMapInArrowExecMeta.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/*** spark-rapids-shim-json-lines -{"spark": "330"} -{"spark": "330cdh"} -{"spark": "330db"} -{"spark": "331"} -{"spark": "332"} -{"spark": "332cdh"} -{"spark": "332db"} -{"spark": "333"} -{"spark": "334"} -{"spark": "340"} -{"spark": "341"} -{"spark": "341db"} -{"spark": "342"} -spark-rapids-shim-json-lines ***/ -package org.apache.spark.sql.rapids.shims - -import com.nvidia.spark.rapids._ - -import org.apache.spark.sql.execution.python._ - -class GpuPythonMapInArrowExecMeta( - mapArrow: PythonMapInArrowExec, - conf: RapidsConf, - parent: Option[RapidsMeta[_, _, _]], - rule: DataFromReplacementRule) - extends GpuPythonMapInArrowExecMetaBase(mapArrow, conf, parent, rule) { - -} diff --git a/sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/PythonMapInArrowExecShims.scala b/sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/PythonMapInArrowExecShims.scala index 98826aa324b..98ccc540613 100644 --- a/sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/PythonMapInArrowExecShims.scala +++ b/sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/PythonMapInArrowExecShims.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.python.PythonMapInArrowExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.rapids.execution.TrampolineUtil -import org.apache.spark.sql.rapids.shims.GpuPythonMapInArrowExecMeta +import org.apache.spark.sql.rapids.execution.python.GpuPythonMapInArrowExecMeta import org.apache.spark.sql.types.{BinaryType, StringType} object PythonMapInArrowExecShims { diff --git a/sql-plugin/src/main/spark350/scala/org/apache/spark/sql/rapids/shims/GpuMapInPandasExecMeta.scala b/sql-plugin/src/main/spark350/scala/org/apache/spark/sql/rapids/shims/GpuMapInPandasExecMeta.scala deleted file mode 100644 index d8377f9c349..00000000000 --- a/sql-plugin/src/main/spark350/scala/org/apache/spark/sql/rapids/shims/GpuMapInPandasExecMeta.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.rapids.shims - -import com.nvidia.spark.rapids._ - -import org.apache.spark.sql.catalyst.expressions.{Attribute} -import org.apache.spark.sql.execution.python.MapInPandasExec -import org.apache.spark.sql.rapids.execution.python.{GpuMapInPandasExec, GpuMapInPandasExecMetaBase} - -/*** spark-rapids-shim-json-lines -{"spark": "350"} -{"spark": "351"} -spark-rapids-shim-json-lines ***/ -class GpuMapInPandasExecMeta( - mapPandas: MapInPandasExec, - conf: RapidsConf, - parent: Option[RapidsMeta[_, _, _]], - rule: DataFromReplacementRule) - extends GpuMapInPandasExecMetaBase(mapPandas, conf, parent, rule) { - - override def convertToGpu(): GpuExec = - GpuMapInPandasExec( - udf.convertToGpu(), - resultAttrs.map(_.convertToGpu()).asInstanceOf[Seq[Attribute]], - childPlans.head.convertIfNeeded(), - isBarrier = mapPandas.isBarrier, - ) -} diff --git a/sql-plugin/src/main/spark350/scala/org/apache/spark/sql/rapids/shims/GpuPythonMapInArrowExecMeta.scala b/sql-plugin/src/main/spark350/scala/org/apache/spark/sql/rapids/shims/GpuPythonMapInArrowExecMeta.scala deleted file mode 100644 index c27f4824c4a..00000000000 --- a/sql-plugin/src/main/spark350/scala/org/apache/spark/sql/rapids/shims/GpuPythonMapInArrowExecMeta.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/*** spark-rapids-shim-json-lines -{"spark": "350"} -{"spark": "351"} -spark-rapids-shim-json-lines ***/ -package org.apache.spark.sql.rapids.shims - -import com.nvidia.spark.rapids._ - -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.execution.python.PythonMapInArrowExec - -class GpuPythonMapInArrowExecMeta( - mapArrow: PythonMapInArrowExec, - conf: RapidsConf, - parent: Option[RapidsMeta[_, _, _]], - rule: DataFromReplacementRule) - extends GpuPythonMapInArrowExecMetaBase(mapArrow, conf, parent, rule) { - - override def convertToGpu(): GpuExec = - GpuPythonMapInArrowExec( - udf.convertToGpu(), - resultAttrs.map(_.convertToGpu()).asInstanceOf[Seq[Attribute]], - childPlans.head.convertIfNeeded(), - isBarrier = mapArrow.isBarrier, - ) -}