Skip to content

Commit

Permalink
Update PandasUDF doc (NVIDIA#2089)
Browse files Browse the repository at this point in the history
* Update PandasUDF doc

Update Pandas UDF doc with more details description

Signed-off-by: Allen Xu <wjxiz1992@gmail.com>

* Resolve comments

* More doc clean

* doc clean and table reformat

* doc clean

* doc clean

* Doc update

* resolve comments

* Resolve comments

* Resolve comments

* resolve comments

* resolve comments

* Resolve comments
  • Loading branch information
wjxiz1992 authored Apr 20, 2021
1 parent d1f9bcd commit 79b4789
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 30 deletions.
140 changes: 110 additions & 30 deletions docs/additional-functionality/rapids-udfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,31 +134,39 @@ implements a Hive simple UDF using
[native code](../../udf-examples/src/main/cpp/src) to count words in strings


## GPU Scheduling For Pandas UDF
## GPU Support for Pandas UDF

---
**NOTE**

The _GPU Scheduling for Pandas UDF_ is an experimental feature, and may change at any point it time.
The GPU support for Pandas UDF is an experimental feature, and may change at any point it time.

---

_GPU Scheduling for Pandas UDF_ is built on Apache Spark's [Pandas UDF(user defined
function)](https://spark.apache.org/docs/3.0.0/sql-pyspark-pandas-with-arrow.html#pandas-udfs-aka-vectorized-udfs),
and has two components:
GPU support for Pandas UDF is built on Apache Spark's [Pandas UDF(user defined
function)](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#pandas-udfs-a-k-a-vectorized-udfs),
and has two features:

- **Share GPU with JVM**: Let the Python process share JVM GPU. The Python process could run on the
same GPU with JVM.
- **GPU Assignment(Scheduling) in Python Process**: Let the Python process share the same GPU with
Spark executor JVM. Without this feature, in a non-isolated environment, some use cases with
Pandas UDF (an `independent` Python daemon process) can try to use GPUs other than the one we want it to
run on. For example, the user could launch a TensorFlow session inside Pandas UDF and the machine
contains 8 GPUs. Without this GPU sharing feature, TensorFlow will automatically use all 8 GPUs
which will conflict with existing Spark executor JVM processes.

- **Increase Speed**: Make the data transport faster between JVM process and Python process.
- **Increase Speed**: Speeds up data transfer between JVM process and Python process.



To enable _GPU Scheduling for Pandas UDF_, you need to configure your spark job with extra settings.
To enable GPU support for Pandas UDF, you need to configure your Spark job with extra settings.

1. Make sure GPU exclusive mode is disabled. Note that this will not work if you are using exclusive
mode to assign GPUs under spark.
2. Currently the python files are packed into the spark rapids plugin jar.
1. Make sure GPU `exclusive` mode is _disabled_. Note that this will not work if you are using
exclusive mode to assign GPUs under Spark. To disable exclusive mode, use
```
nvidia-smi -i 0 -c Default # Set GPU 0 to default mode, run as root.
```

2. Currently the Python files are packed into the RAPIDS Accelerator jar.

On Yarn, you need to add
```shell
Expand All @@ -170,35 +178,107 @@ To enable _GPU Scheduling for Pandas UDF_, you need to configure your spark job
On Standalone, you need to add
```shell
...
--conf spark.executorEnv.PYTHONPATH=rapids-4-spark_2.12-0.5.0.jar \
--conf spark.executorEnv.PYTHONPATH=${SPARK_RAPIDS_PLUGIN_JAR} \
--py-files ${SPARK_RAPIDS_PLUGIN_JAR}
```

3. Enable GPU Scheduling for Pandas UDF.
3. Enable GPU Assignment(Scheduling) for Pandas UDF.

```shell
...
--conf spark.rapids.python.gpu.enabled=true \
--conf spark.rapids.python.memory.gpu.pooling.enabled=false \
--conf spark.rapids.sql.exec.ArrowEvalPythonExec=true \
--conf spark.rapids.sql.exec.MapInPandasExec=true \
--conf spark.rapids.sql.exec.FlatMapGroupsInPandasExec=true \
--conf spark.rapids.sql.exec.AggregateInPandasExec=true \
--conf spark.rapids.sql.exec.FlatMapCoGroupsInPandasExec=true \
--conf spark.rapids.sql.exec.WindowInPandasExec=true
```

Please note the data transfer acceleration only supports scalar UDF and Scalar iterator UDF currently.
You could choose the exec you need to enable.
Please note: every type of Pandas UDF on Spark is run by a specific Spark execution plan. RAPIDS
Accelerator has a 1-1 mapping support for each of them. Not all Pandas UDF types are data-transfer
accelerated at present:

| Spark Execution Plan|Data Transfer Accelerated|Use Case|
|----------------------|----------|--------|
|ArrowEvalPythonExec|yes|[Series to Series](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#series-to-series), [Iterator of Series to Iterator of Series](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#iterator-of-series-to-iterator-of-series) and [Iterator of Multiple Series to Iterator of Series](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#iterator-of-multiple-series-to-iterator-of-series)|
|MapInPandasExec|yes|[Map](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#map)|
| WindowInPandasExec|yes|[Window](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#series-to-scalar)|
| FlatMapGroupsInPandasExec|no|[Grouped Map](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#grouped-map)|
| AggregateInPandasExec|no|[Aggregate](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#series-to-scalar)|
|FlatMapCoGroupsInPandasExec|no|[Co-grouped Map](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#co-grouped-map)|


### Other Configuration

Following configuration settings are also for _GPU Scheduling for Pandas UDF_
```
spark.rapids.python.concurrentPythonWorkers
spark.rapids.python.memory.gpu.allocFraction
spark.rapids.python.memory.gpu.maxAllocFraction
```
The following configuration settings are also relevant for GPU scheduling for Pandas UDF.

1. Memory efficiency

```shell
--conf spark.rapids.python.memory.gpu.pooling.enabled=false \
--conf spark.rapids.python.memory.gpu.allocFraction=0.1 \
--conf spark.rapids.python.memory.gpu.maxAllocFraction= 0.2 \
```
Similar to the [RMM pooling for JVM](../tuning-guide.md#pooled-memory) settings like
`spark.rapids.memory.gpu.allocFraction` and `spark.rapids.memory.gpu.maxAllocFraction` except
these specify the GPU pool size for the _Python processes_. Half of the GPU _available_ memory
will be used by default if it is not specified.


2. Limit of concurrent Python processes

```shell
--conf spark.rapids.python.concurrentPythonWorkers=2 \
```
This parameter limits the total concurrent running _Python processes_ for a Spark executor.
It defaults to 0 which means no limit. Note that for certain cases, setting
this value too small _may result in a hang for your Spark job_ because a task may contain
multiple Pandas UDF(`MapInPandas`) instances which result in multiple Python processes.
Each process will try to acquire the Python GPU process semaphore. This may result in a
deadlock situation because a Spark job will not proceed until all its tasks are finished.

For example, in a specific Spark Stage that contains 3 Pandas UDFs, 2 Spark tasks are running
and each task launches 3 Python processes while we set this
`spark.rapids.python.concurrentPythonWorkers` to 4.

```python
df_1 = df_0.mapInPandas(udf_1, schema_1)
df_2 = df_1.mapInPandas(udf_2, schema_2)
df_3 = df_2.mapInPandas(udf_3, schema_3)
df_3.explain(True)
```
The RAPIDS Accelerator query explain:
```
...
*Exec <MapInPandasExec> could partially run on GPU
*Exec <MapInPandasExec> could partially run on GPU
*Exec <MapInPandasExec> could partially run on GPU
...
```

![Python concurrent worker](../img/concurrentPythonWorker.PNG)

In this case, each Pandas UDF will launch a Python process. At this moment two Python processes
in each task(in light green) acquired their semaphore but neither of them are able to proceed
because both of them are waiting for their third semaphore to start the task.

Another example is to use `ArrowEvalPythonExec`, with the following code:

```python
import pyspark.sql.functions as F
...
df = df.withColumn("c_1",udf_1(F.col("a"), F.col("b")))
df = df.withColumn('c_2', F.hash(F.col('c_1')))
df = df.withColumn("c_3",udf_2(F.col("c_2")))
...
```
The physical plan:
```
+- GpuArrowEvalPython
+- ...
+- ...
+- GpuArrowEvalPython
```
This means each Spark task will trigger 2 Python processes. In this case, if we set
`spark.rapids.python.concurrentPythonWorkers=2`, it will also probably result in a hang as we
allow 2 tasks running and each of them spawns 2 Python processes. Let's say Task_1_Process_1 and
Task_2_Process_1 acquired the semaphore, but neither of them are going to proceed becasue both
of them are waiting for their second semaphore.
To find details on the above Python configuration settings, please see the [RAPIDS Accelerator for
Apache Spark Configuration Guide](../configs.md).
Apache Spark Configuration Guide](../configs.md). Search 'pandas' for a quick navigation jump.
Binary file added docs/img/concurrentPythonWorker.PNG
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit 79b4789

Please sign in to comment.