Skip to content

Commit

Permalink
Revert "Support barrier mode for mapInPandas/mapInArrow (#10364)" (#1…
Browse files Browse the repository at this point in the history
…0369)

This reverts commit cca5955.

Signed-off-by: Jason Lowe <jlowe@nvidia.com>
  • Loading branch information
jlowe authored Feb 2, 2024
1 parent c5dde86 commit fdd1a5d
Show file tree
Hide file tree
Showing 11 changed files with 20 additions and 255 deletions.
39 changes: 0 additions & 39 deletions integration_tests/src/main/python/udf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.

import pytest
from pyspark import BarrierTaskContext, TaskContext

from conftest import is_at_least_precommit_run
from spark_session import is_databricks_runtime, is_before_spark_330, is_before_spark_350, is_spark_341
Expand Down Expand Up @@ -426,41 +425,3 @@ def test_func(spark):
lambda data: [pd.DataFrame([len(list(data))])], schema="ret:integer")

assert_gpu_and_cpu_are_equal_collect(test_func, conf=arrow_udf_conf)


@pytest.mark.skipif(is_before_spark_350(),
reason='mapInPandas with barrier mode is introduced by Pyspark 3.5.0')
@pytest.mark.parametrize('is_barrier', [True, False], ids=idfn)
def test_map_in_pandas_with_barrier_mode(is_barrier):
def func(iterator):
tc = TaskContext.get()
assert tc is not None
if is_barrier:
assert isinstance(tc, BarrierTaskContext)
else:
assert not isinstance(tc, BarrierTaskContext)

for batch in iterator:
yield batch

assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.range(0, 10, 1, 1).mapInPandas(func, "id long", is_barrier))


@pytest.mark.skipif(is_before_spark_350(),
reason='mapInArrow with barrier mode is introduced by Pyspark 3.5.0')
@pytest.mark.parametrize('is_barrier', [True, False], ids=idfn)
def test_map_in_arrow_with_barrier_mode(is_barrier):
def func(iterator):
tc = TaskContext.get()
assert tc is not None
if is_barrier:
assert isinstance(tc, BarrierTaskContext)
else:
assert not isinstance(tc, BarrierTaskContext)

for batch in iterator:
yield batch

assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.range(0, 10, 1, 1).mapInArrow(func, "id long", is_barrier))
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ import org.apache.spark.sql.rapids.catalyst.expressions.GpuRand
import org.apache.spark.sql.rapids.execution._
import org.apache.spark.sql.rapids.execution.python._
import org.apache.spark.sql.rapids.execution.python.GpuFlatMapGroupsInPandasExecMeta
import org.apache.spark.sql.rapids.shims.{GpuAscii, GpuMapInPandasExecMeta, GpuTimeAdd}
import org.apache.spark.sql.rapids.shims.{GpuAscii, GpuTimeAdd}
import org.apache.spark.sql.rapids.zorder.ZOrderRules
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ trait GpuMapInBatchExec extends ShimUnaryExecNode with GpuPythonExecBase {
protected val func: Expression
protected val pythonEvalType: Int

protected val isBarrier: Boolean

private val pandasFunction = func.asInstanceOf[GpuPythonUDF].func

override def producedAttributes: AttributeSet = AttributeSet(output)
Expand All @@ -67,7 +65,7 @@ trait GpuMapInBatchExec extends ShimUnaryExecNode with GpuPythonExecBase {
val localEvalType = pythonEvalType

// Start process
val func = (inputIter: Iterator[ColumnarBatch]) => {
child.executeColumnar().mapPartitionsInternal { inputIter =>
val context = TaskContext.get()

// Single function with one struct.
Expand Down Expand Up @@ -111,14 +109,7 @@ trait GpuMapInBatchExec extends ShimUnaryExecNode with GpuPythonExecBase {
numOutputRows += cb.numRows
cb
}
} // end of func

if (isBarrier) {
child.executeColumnar().barrier().mapPartitions(func)
} else {
child.executeColumnar().mapPartitionsInternal(func)
}

} // end of mapPartitionsInternal
} // end of internalDoExecuteColumnar

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,9 +21,9 @@ import com.nvidia.spark.rapids._
import org.apache.spark.api.python.PythonEvalType
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, PythonUDF}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.python.MapInPandasExec
import org.apache.spark.sql.execution.python._

class GpuMapInPandasExecMetaBase(
class GpuMapInPandasExecMeta(
mapPandas: MapInPandasExec,
conf: RapidsConf,
parent: Option[RapidsMeta[_, _, _]],
Expand All @@ -34,9 +34,9 @@ class GpuMapInPandasExecMetaBase(
override def noReplacementPossibleMessage(reasons: String): String =
s"cannot run even partially on the GPU because $reasons"

protected val udf: BaseExprMeta[PythonUDF] = GpuOverrides.wrapExpr(
private val udf: BaseExprMeta[PythonUDF] = GpuOverrides.wrapExpr(
mapPandas.func.asInstanceOf[PythonUDF], conf, Some(this))
protected val resultAttrs: Seq[BaseExprMeta[Attribute]] =
private val resultAttrs: Seq[BaseExprMeta[Attribute]] =
mapPandas.output.map(GpuOverrides.wrapExpr(_, conf, Some(this)))

override val childExprs: Seq[BaseExprMeta[_]] = resultAttrs :+ udf
Expand All @@ -45,8 +45,7 @@ class GpuMapInPandasExecMetaBase(
GpuMapInPandasExec(
udf.convertToGpu(),
resultAttrs.map(_.convertToGpu()).asInstanceOf[Seq[Attribute]],
childPlans.head.convertIfNeeded(),
isBarrier = false,
childPlans.head.convertIfNeeded()
)
}

Expand All @@ -61,8 +60,7 @@ class GpuMapInPandasExecMetaBase(
case class GpuMapInPandasExec(
func: Expression,
output: Seq[Attribute],
child: SparkPlan,
override val isBarrier: Boolean) extends GpuMapInBatchExec {
child: SparkPlan) extends GpuMapInBatchExec {

override protected val pythonEvalType: Int = PythonEvalType.SQL_MAP_PANDAS_ITER_UDF
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import com.nvidia.spark.rapids._

import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.python.PythonMapInArrowExec
import org.apache.spark.sql.rapids.shims.GpuPythonMapInArrowExecMeta
import org.apache.spark.sql.rapids.execution.python.GpuPythonMapInArrowExecMeta

object PythonMapInArrowExecShims {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,17 +31,16 @@
{"spark": "350"}
{"spark": "351"}
spark-rapids-shim-json-lines ***/
package org.apache.spark.sql.rapids.shims
package org.apache.spark.sql.rapids.execution.python

import com.nvidia.spark.rapids._

import org.apache.spark.api.python.PythonEvalType
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, PythonUDF}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.python.PythonMapInArrowExec
import org.apache.spark.sql.rapids.execution.python.GpuMapInBatchExec
import org.apache.spark.sql.execution.python._

class GpuPythonMapInArrowExecMetaBase(
class GpuPythonMapInArrowExecMeta(
mapArrow: PythonMapInArrowExec,
conf: RapidsConf,
parent: Option[RapidsMeta[_, _, _]],
Expand All @@ -52,9 +51,9 @@ class GpuPythonMapInArrowExecMetaBase(
override def noReplacementPossibleMessage(reasons: String): String =
s"cannot run even partially on the GPU because $reasons"

protected val udf: BaseExprMeta[PythonUDF] = GpuOverrides.wrapExpr(
private val udf: BaseExprMeta[PythonUDF] = GpuOverrides.wrapExpr(
mapArrow.func.asInstanceOf[PythonUDF], conf, Some(this))
protected val resultAttrs: Seq[BaseExprMeta[Attribute]] =
private val resultAttrs: Seq[BaseExprMeta[Attribute]] =
mapArrow.output.map(GpuOverrides.wrapExpr(_, conf, Some(this)))

override val childExprs: Seq[BaseExprMeta[_]] = resultAttrs :+ udf
Expand All @@ -63,8 +62,7 @@ class GpuPythonMapInArrowExecMetaBase(
GpuPythonMapInArrowExec(
udf.convertToGpu(),
resultAttrs.map(_.convertToGpu()).asInstanceOf[Seq[Attribute]],
childPlans.head.convertIfNeeded(),
isBarrier = false,
childPlans.head.convertIfNeeded()
)
}

Expand All @@ -79,8 +77,7 @@ class GpuPythonMapInArrowExecMetaBase(
case class GpuPythonMapInArrowExec(
func: Expression,
output: Seq[Attribute],
child: SparkPlan,
override val isBarrier: Boolean) extends GpuMapInBatchExec {
child: SparkPlan) extends GpuMapInBatchExec {

override protected val pythonEvalType: Int = PythonEvalType.SQL_MAP_ARROW_ITER_UDF
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.python.PythonMapInArrowExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.rapids.shims.GpuPythonMapInArrowExecMeta
import org.apache.spark.sql.rapids.execution.python.GpuPythonMapInArrowExecMeta
import org.apache.spark.sql.types.{BinaryType, StringType}

object PythonMapInArrowExecShims {
Expand Down

This file was deleted.

Loading

0 comments on commit fdd1a5d

Please sign in to comment.