Skip to content

Commit

Permalink
Support barrier mode for mapInPandas/mapInArrow (#10364)
Browse files Browse the repository at this point in the history
Signed-off-by: Bobby Wang <wbo4958@gmail.com>
  • Loading branch information
wbo4958 authored Feb 2, 2024
1 parent 3b14e01 commit cca5955
Show file tree
Hide file tree
Showing 11 changed files with 255 additions and 20 deletions.
39 changes: 39 additions & 0 deletions integration_tests/src/main/python/udf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import pytest
from pyspark import BarrierTaskContext, TaskContext

from conftest import is_at_least_precommit_run
from spark_session import is_databricks_runtime, is_before_spark_330, is_before_spark_350, is_spark_341
Expand Down Expand Up @@ -425,3 +426,41 @@ def test_func(spark):
lambda data: [pd.DataFrame([len(list(data))])], schema="ret:integer")

assert_gpu_and_cpu_are_equal_collect(test_func, conf=arrow_udf_conf)


@pytest.mark.skipif(is_before_spark_350(),
reason='mapInPandas with barrier mode is introduced by Pyspark 3.5.0')
@pytest.mark.parametrize('is_barrier', [True, False], ids=idfn)
def test_map_in_pandas_with_barrier_mode(is_barrier):
def func(iterator):
tc = TaskContext.get()
assert tc is not None
if is_barrier:
assert isinstance(tc, BarrierTaskContext)
else:
assert not isinstance(tc, BarrierTaskContext)

for batch in iterator:
yield batch

assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.range(0, 10, 1, 1).mapInPandas(func, "id long", is_barrier))


@pytest.mark.skipif(is_before_spark_350(),
reason='mapInArrow with barrier mode is introduced by Pyspark 3.5.0')
@pytest.mark.parametrize('is_barrier', [True, False], ids=idfn)
def test_map_in_arrow_with_barrier_mode(is_barrier):
def func(iterator):
tc = TaskContext.get()
assert tc is not None
if is_barrier:
assert isinstance(tc, BarrierTaskContext)
else:
assert not isinstance(tc, BarrierTaskContext)

for batch in iterator:
yield batch

assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.range(0, 10, 1, 1).mapInArrow(func, "id long", is_barrier))
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ import org.apache.spark.sql.rapids.catalyst.expressions.GpuRand
import org.apache.spark.sql.rapids.execution._
import org.apache.spark.sql.rapids.execution.python._
import org.apache.spark.sql.rapids.execution.python.GpuFlatMapGroupsInPandasExecMeta
import org.apache.spark.sql.rapids.shims.{GpuAscii, GpuTimeAdd}
import org.apache.spark.sql.rapids.shims.{GpuAscii, GpuMapInPandasExecMeta, GpuTimeAdd}
import org.apache.spark.sql.rapids.zorder.ZOrderRules
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ trait GpuMapInBatchExec extends ShimUnaryExecNode with GpuPythonExecBase {
protected val func: Expression
protected val pythonEvalType: Int

protected val isBarrier: Boolean

private val pandasFunction = func.asInstanceOf[GpuPythonUDF].func

override def producedAttributes: AttributeSet = AttributeSet(output)
Expand All @@ -65,7 +67,7 @@ trait GpuMapInBatchExec extends ShimUnaryExecNode with GpuPythonExecBase {
val localEvalType = pythonEvalType

// Start process
child.executeColumnar().mapPartitionsInternal { inputIter =>
val func = (inputIter: Iterator[ColumnarBatch]) => {
val context = TaskContext.get()

// Single function with one struct.
Expand Down Expand Up @@ -109,7 +111,14 @@ trait GpuMapInBatchExec extends ShimUnaryExecNode with GpuPythonExecBase {
numOutputRows += cb.numRows
cb
}
} // end of mapPartitionsInternal
} // end of func

if (isBarrier) {
child.executeColumnar().barrier().mapPartitions(func)
} else {
child.executeColumnar().mapPartitionsInternal(func)
}

} // end of internalDoExecuteColumnar

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,9 +21,9 @@ import com.nvidia.spark.rapids._
import org.apache.spark.api.python.PythonEvalType
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, PythonUDF}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.python._
import org.apache.spark.sql.execution.python.MapInPandasExec

class GpuMapInPandasExecMeta(
class GpuMapInPandasExecMetaBase(
mapPandas: MapInPandasExec,
conf: RapidsConf,
parent: Option[RapidsMeta[_, _, _]],
Expand All @@ -34,9 +34,9 @@ class GpuMapInPandasExecMeta(
override def noReplacementPossibleMessage(reasons: String): String =
s"cannot run even partially on the GPU because $reasons"

private val udf: BaseExprMeta[PythonUDF] = GpuOverrides.wrapExpr(
protected val udf: BaseExprMeta[PythonUDF] = GpuOverrides.wrapExpr(
mapPandas.func.asInstanceOf[PythonUDF], conf, Some(this))
private val resultAttrs: Seq[BaseExprMeta[Attribute]] =
protected val resultAttrs: Seq[BaseExprMeta[Attribute]] =
mapPandas.output.map(GpuOverrides.wrapExpr(_, conf, Some(this)))

override val childExprs: Seq[BaseExprMeta[_]] = resultAttrs :+ udf
Expand All @@ -45,7 +45,8 @@ class GpuMapInPandasExecMeta(
GpuMapInPandasExec(
udf.convertToGpu(),
resultAttrs.map(_.convertToGpu()).asInstanceOf[Seq[Attribute]],
childPlans.head.convertIfNeeded()
childPlans.head.convertIfNeeded(),
isBarrier = false,
)
}

Expand All @@ -60,7 +61,8 @@ class GpuMapInPandasExecMeta(
case class GpuMapInPandasExec(
func: Expression,
output: Seq[Attribute],
child: SparkPlan) extends GpuMapInBatchExec {
child: SparkPlan,
override val isBarrier: Boolean) extends GpuMapInBatchExec {

override protected val pythonEvalType: Int = PythonEvalType.SQL_MAP_PANDAS_ITER_UDF
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.rapids.shims

import com.nvidia.spark.rapids._

import org.apache.spark.sql.execution.python.MapInPandasExec
import org.apache.spark.sql.rapids.execution.python.GpuMapInPandasExecMetaBase

/*** spark-rapids-shim-json-lines
{"spark": "311"}
{"spark": "312"}
{"spark": "313"}
{"spark": "320"}
{"spark": "321"}
{"spark": "321cdh"}
{"spark": "322"}
{"spark": "323"}
{"spark": "324"}
{"spark": "330"}
{"spark": "330cdh"}
{"spark": "331"}
{"spark": "332"}
{"spark": "332cdh"}
{"spark": "333"}
{"spark": "334"}
{"spark": "340"}
{"spark": "341"}
{"spark": "342"}
spark-rapids-shim-json-lines ***/
class GpuMapInPandasExecMeta(
mapPandas: MapInPandasExec,
conf: RapidsConf,
parent: Option[RapidsMeta[_, _, _]],
rule: DataFromReplacementRule)
extends GpuMapInPandasExecMetaBase(mapPandas, conf, parent, rule) {

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import com.nvidia.spark.rapids._

import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.python.PythonMapInArrowExec
import org.apache.spark.sql.rapids.execution.python.GpuPythonMapInArrowExecMeta
import org.apache.spark.sql.rapids.shims.GpuPythonMapInArrowExecMeta

object PythonMapInArrowExecShims {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,16 +31,17 @@
{"spark": "350"}
{"spark": "351"}
spark-rapids-shim-json-lines ***/
package org.apache.spark.sql.rapids.execution.python
package org.apache.spark.sql.rapids.shims

import com.nvidia.spark.rapids._

import org.apache.spark.api.python.PythonEvalType
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, PythonUDF}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.python._
import org.apache.spark.sql.execution.python.PythonMapInArrowExec
import org.apache.spark.sql.rapids.execution.python.GpuMapInBatchExec

class GpuPythonMapInArrowExecMeta(
class GpuPythonMapInArrowExecMetaBase(
mapArrow: PythonMapInArrowExec,
conf: RapidsConf,
parent: Option[RapidsMeta[_, _, _]],
Expand All @@ -51,9 +52,9 @@ class GpuPythonMapInArrowExecMeta(
override def noReplacementPossibleMessage(reasons: String): String =
s"cannot run even partially on the GPU because $reasons"

private val udf: BaseExprMeta[PythonUDF] = GpuOverrides.wrapExpr(
protected val udf: BaseExprMeta[PythonUDF] = GpuOverrides.wrapExpr(
mapArrow.func.asInstanceOf[PythonUDF], conf, Some(this))
private val resultAttrs: Seq[BaseExprMeta[Attribute]] =
protected val resultAttrs: Seq[BaseExprMeta[Attribute]] =
mapArrow.output.map(GpuOverrides.wrapExpr(_, conf, Some(this)))

override val childExprs: Seq[BaseExprMeta[_]] = resultAttrs :+ udf
Expand All @@ -62,7 +63,8 @@ class GpuPythonMapInArrowExecMeta(
GpuPythonMapInArrowExec(
udf.convertToGpu(),
resultAttrs.map(_.convertToGpu()).asInstanceOf[Seq[Attribute]],
childPlans.head.convertIfNeeded()
childPlans.head.convertIfNeeded(),
isBarrier = false,
)
}

Expand All @@ -77,7 +79,8 @@ class GpuPythonMapInArrowExecMeta(
case class GpuPythonMapInArrowExec(
func: Expression,
output: Seq[Attribute],
child: SparkPlan) extends GpuMapInBatchExec {
child: SparkPlan,
override val isBarrier: Boolean) extends GpuMapInBatchExec {

override protected val pythonEvalType: Int = PythonEvalType.SQL_MAP_ARROW_ITER_UDF
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*** spark-rapids-shim-json-lines
{"spark": "330"}
{"spark": "330cdh"}
{"spark": "330db"}
{"spark": "331"}
{"spark": "332"}
{"spark": "332cdh"}
{"spark": "332db"}
{"spark": "333"}
{"spark": "334"}
{"spark": "340"}
{"spark": "341"}
{"spark": "341db"}
{"spark": "342"}
spark-rapids-shim-json-lines ***/
package org.apache.spark.sql.rapids.shims

import com.nvidia.spark.rapids._

import org.apache.spark.sql.execution.python._

class GpuPythonMapInArrowExecMeta(
mapArrow: PythonMapInArrowExec,
conf: RapidsConf,
parent: Option[RapidsMeta[_, _, _]],
rule: DataFromReplacementRule)
extends GpuPythonMapInArrowExecMetaBase(mapArrow, conf, parent, rule) {

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.python.PythonMapInArrowExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.rapids.execution.python.GpuPythonMapInArrowExecMeta
import org.apache.spark.sql.rapids.shims.GpuPythonMapInArrowExecMeta
import org.apache.spark.sql.types.{BinaryType, StringType}

object PythonMapInArrowExecShims {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.rapids.shims

import com.nvidia.spark.rapids._

import org.apache.spark.sql.catalyst.expressions.{Attribute}
import org.apache.spark.sql.execution.python.MapInPandasExec
import org.apache.spark.sql.rapids.execution.python.{GpuMapInPandasExec, GpuMapInPandasExecMetaBase}

/*** spark-rapids-shim-json-lines
{"spark": "350"}
{"spark": "351"}
spark-rapids-shim-json-lines ***/
class GpuMapInPandasExecMeta(
mapPandas: MapInPandasExec,
conf: RapidsConf,
parent: Option[RapidsMeta[_, _, _]],
rule: DataFromReplacementRule)
extends GpuMapInPandasExecMetaBase(mapPandas, conf, parent, rule) {

override def convertToGpu(): GpuExec =
GpuMapInPandasExec(
udf.convertToGpu(),
resultAttrs.map(_.convertToGpu()).asInstanceOf[Seq[Attribute]],
childPlans.head.convertIfNeeded(),
isBarrier = mapPandas.isBarrier,
)
}
Loading

0 comments on commit cca5955

Please sign in to comment.