diff --git a/docs/additional-functionality/rapids-udfs.md b/docs/additional-functionality/rapids-udfs.md index 2c659150114..4870bb049ba 100644 --- a/docs/additional-functionality/rapids-udfs.md +++ b/docs/additional-functionality/rapids-udfs.md @@ -134,31 +134,39 @@ implements a Hive simple UDF using [native code](../../udf-examples/src/main/cpp/src) to count words in strings -## GPU Scheduling For Pandas UDF +## GPU Support for Pandas UDF --- **NOTE** -The _GPU Scheduling for Pandas UDF_ is an experimental feature, and may change at any point it time. +The GPU support for Pandas UDF is an experimental feature, and may change at any point it time. --- -_GPU Scheduling for Pandas UDF_ is built on Apache Spark's [Pandas UDF(user defined -function)](https://spark.apache.org/docs/3.0.0/sql-pyspark-pandas-with-arrow.html#pandas-udfs-aka-vectorized-udfs), -and has two components: +GPU support for Pandas UDF is built on Apache Spark's [Pandas UDF(user defined +function)](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#pandas-udfs-a-k-a-vectorized-udfs), +and has two features: -- **Share GPU with JVM**: Let the Python process share JVM GPU. The Python process could run on the - same GPU with JVM. +- **GPU Assignment(Scheduling) in Python Process**: Let the Python process share the same GPU with +Spark executor JVM. Without this feature, in a non-isolated environment, some use cases with +Pandas UDF (an `independent` Python daemon process) can try to use GPUs other than the one we want it to +run on. For example, the user could launch a TensorFlow session inside Pandas UDF and the machine +contains 8 GPUs. Without this GPU sharing feature, TensorFlow will automatically use all 8 GPUs +which will conflict with existing Spark executor JVM processes. -- **Increase Speed**: Make the data transport faster between JVM process and Python process. +- **Increase Speed**: Speeds up data transfer between JVM process and Python process. -To enable _GPU Scheduling for Pandas UDF_, you need to configure your spark job with extra settings. +To enable GPU support for Pandas UDF, you need to configure your Spark job with extra settings. -1. Make sure GPU exclusive mode is disabled. Note that this will not work if you are using exclusive - mode to assign GPUs under spark. -2. Currently the python files are packed into the spark rapids plugin jar. +1. Make sure GPU `exclusive` mode is _disabled_. Note that this will not work if you are using +exclusive mode to assign GPUs under Spark. To disable exclusive mode, use + ``` + nvidia-smi -i 0 -c Default # Set GPU 0 to default mode, run as root. + ``` + +2. Currently the Python files are packed into the RAPIDS Accelerator jar. On Yarn, you need to add ```shell @@ -170,35 +178,107 @@ To enable _GPU Scheduling for Pandas UDF_, you need to configure your spark job On Standalone, you need to add ```shell ... - --conf spark.executorEnv.PYTHONPATH=rapids-4-spark_2.12-0.5.0.jar \ + --conf spark.executorEnv.PYTHONPATH=${SPARK_RAPIDS_PLUGIN_JAR} \ --py-files ${SPARK_RAPIDS_PLUGIN_JAR} ``` -3. Enable GPU Scheduling for Pandas UDF. +3. Enable GPU Assignment(Scheduling) for Pandas UDF. ```shell ... --conf spark.rapids.python.gpu.enabled=true \ - --conf spark.rapids.python.memory.gpu.pooling.enabled=false \ - --conf spark.rapids.sql.exec.ArrowEvalPythonExec=true \ - --conf spark.rapids.sql.exec.MapInPandasExec=true \ - --conf spark.rapids.sql.exec.FlatMapGroupsInPandasExec=true \ - --conf spark.rapids.sql.exec.AggregateInPandasExec=true \ - --conf spark.rapids.sql.exec.FlatMapCoGroupsInPandasExec=true \ - --conf spark.rapids.sql.exec.WindowInPandasExec=true ``` -Please note the data transfer acceleration only supports scalar UDF and Scalar iterator UDF currently. -You could choose the exec you need to enable. +Please note: every type of Pandas UDF on Spark is run by a specific Spark execution plan. RAPIDS +Accelerator has a 1-1 mapping support for each of them. Not all Pandas UDF types are data-transfer +accelerated at present: + + | Spark Execution Plan|Data Transfer Accelerated|Use Case| + |----------------------|----------|--------| + |ArrowEvalPythonExec|yes|[Series to Series](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#series-to-series), [Iterator of Series to Iterator of Series](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#iterator-of-series-to-iterator-of-series) and [Iterator of Multiple Series to Iterator of Series](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#iterator-of-multiple-series-to-iterator-of-series)| + |MapInPandasExec|yes|[Map](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#map)| + | WindowInPandasExec|yes|[Window](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#series-to-scalar)| + | FlatMapGroupsInPandasExec|no|[Grouped Map](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#grouped-map)| + | AggregateInPandasExec|no|[Aggregate](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#series-to-scalar)| + |FlatMapCoGroupsInPandasExec|no|[Co-grouped Map](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#co-grouped-map)| + ### Other Configuration -Following configuration settings are also for _GPU Scheduling for Pandas UDF_ -``` -spark.rapids.python.concurrentPythonWorkers -spark.rapids.python.memory.gpu.allocFraction -spark.rapids.python.memory.gpu.maxAllocFraction -``` +The following configuration settings are also relevant for GPU scheduling for Pandas UDF. + +1. Memory efficiency + + ```shell + --conf spark.rapids.python.memory.gpu.pooling.enabled=false \ + --conf spark.rapids.python.memory.gpu.allocFraction=0.1 \ + --conf spark.rapids.python.memory.gpu.maxAllocFraction= 0.2 \ + ``` + Similar to the [RMM pooling for JVM](../tuning-guide.md#pooled-memory) settings like + `spark.rapids.memory.gpu.allocFraction` and `spark.rapids.memory.gpu.maxAllocFraction` except + these specify the GPU pool size for the _Python processes_. Half of the GPU _available_ memory + will be used by default if it is not specified. + + +2. Limit of concurrent Python processes + + ```shell + --conf spark.rapids.python.concurrentPythonWorkers=2 \ + ``` + This parameter limits the total concurrent running _Python processes_ for a Spark executor. + It defaults to 0 which means no limit. Note that for certain cases, setting + this value too small _may result in a hang for your Spark job_ because a task may contain + multiple Pandas UDF(`MapInPandas`) instances which result in multiple Python processes. + Each process will try to acquire the Python GPU process semaphore. This may result in a + deadlock situation because a Spark job will not proceed until all its tasks are finished. + + For example, in a specific Spark Stage that contains 3 Pandas UDFs, 2 Spark tasks are running + and each task launches 3 Python processes while we set this + `spark.rapids.python.concurrentPythonWorkers` to 4. + + ```python + df_1 = df_0.mapInPandas(udf_1, schema_1) + df_2 = df_1.mapInPandas(udf_2, schema_2) + df_3 = df_2.mapInPandas(udf_3, schema_3) + df_3.explain(True) + ``` + The RAPIDS Accelerator query explain: + ``` + ... + *Exec could partially run on GPU + *Exec could partially run on GPU + *Exec could partially run on GPU + ... + ``` + + ![Python concurrent worker](../img/concurrentPythonWorker.PNG) + + In this case, each Pandas UDF will launch a Python process. At this moment two Python processes + in each task(in light green) acquired their semaphore but neither of them are able to proceed + because both of them are waiting for their third semaphore to start the task. + + Another example is to use `ArrowEvalPythonExec`, with the following code: + + ```python + import pyspark.sql.functions as F + ... + df = df.withColumn("c_1",udf_1(F.col("a"), F.col("b"))) + df = df.withColumn('c_2', F.hash(F.col('c_1'))) + df = df.withColumn("c_3",udf_2(F.col("c_2"))) + ... + ``` + The physical plan: + ``` + +- GpuArrowEvalPython + +- ... + +- ... + +- GpuArrowEvalPython + ``` + This means each Spark task will trigger 2 Python processes. In this case, if we set + `spark.rapids.python.concurrentPythonWorkers=2`, it will also probably result in a hang as we + allow 2 tasks running and each of them spawns 2 Python processes. Let's say Task_1_Process_1 and + Task_2_Process_1 acquired the semaphore, but neither of them are going to proceed becasue both + of them are waiting for their second semaphore. To find details on the above Python configuration settings, please see the [RAPIDS Accelerator for -Apache Spark Configuration Guide](../configs.md). +Apache Spark Configuration Guide](../configs.md). Search 'pandas' for a quick navigation jump. diff --git a/docs/img/concurrentPythonWorker.PNG b/docs/img/concurrentPythonWorker.PNG new file mode 100644 index 00000000000..6833c4a1431 Binary files /dev/null and b/docs/img/concurrentPythonWorker.PNG differ