Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update PandasUDF doc #2089

Merged
merged 13 commits into from
Apr 20, 2021
Merged

Update PandasUDF doc #2089

merged 13 commits into from
Apr 20, 2021

Conversation

wjxiz1992
Copy link
Collaborator

@wjxiz1992 wjxiz1992 commented Apr 7, 2021

Fixes #2053.
Update Pandas UDF doc with more details description.

Signed-off-by: Allen Xu wjxiz1992@gmail.com


- **Share GPU with JVM**: Let the Python process share JVM GPU. The Python process could run on the
same GPU with JVM.
- **GPU Assignment(Scheduling) in Python Process**: Let the Python process share the same GPU with Spark executor JVM. Without this feature, some use case in PandasUDF(a.k.a an `independent` process) will likely to use other GPUs other than the one we want it to run on. e.g. user can launch a TensorFlow session inside Pandas UDF and the machine contains 8 GPUs. user launchs 8 Spark executors. Without this GPU sharing feature, TensorFlow will automatically use all 8 GPUs it can detects which will definitly conflict with existing Spark executor JVM processes.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- **GPU Assignment(Scheduling) in Python Process**: Let the Python process share the same GPU with Spark executor JVM. Without this feature, some use case in PandasUDF(a.k.a an `independent` process) will likely to use other GPUs other than the one we want it to run on. e.g. user can launch a TensorFlow session inside Pandas UDF and the machine contains 8 GPUs. user launchs 8 Spark executors. Without this GPU sharing feature, TensorFlow will automatically use all 8 GPUs it can detects which will definitly conflict with existing Spark executor JVM processes.
- **GPU Assignment(Scheduling) in Python Process**: Let the Python process share the same GPU with Spark executor JVM. Without this feature, in a non-isolated environment, some use cases with PandasUDF (a.k.a an `independent` process) can try to use GPUs other than the one we want it to run on. For example, the user could launch a TensorFlow session inside Pandas UDF and the machine contains 8 GPUs. Without this GPU sharing feature, TensorFlow will automatically use all 8 GPUs which will conflict with existing Spark executor JVM processes.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


```
--conf spark.rapids.sql.exec.ArrowEvalPythonExec=true \
--conf spark.rapids.sql.exec.MapInPandasExec=false \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you mean to have these false? or are these false due to performance?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed and add a status table for them for better view.

--conf spark.rapids.sql.exec.WindowInPandasExec=true
```

These configs are the switches for each type of PandasUDF execution plan. Some of theme are set to false by default due to not supported or performance issue.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/theme/them/

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed as suggested.

--conf spark.rapids.python.memory.gpu.allocFraction=0.1 \
--conf spark.rapids.python.memory.gpu.maxAllocFraction= 0.2 \
```
Same to the [RMM pooling for JVM](../tuning-guide.md#pooled-memory), here the pooling serves the same way but for Python process. `half of the rest GPU memory` will be used by default if it is not specified.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be clear here, is this the python process will assume it can use half the memory of the GPU? I think we should mention the other spark.rapids.memory.gpu.allocFraction setting here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

```
--conf spark.rapids.python.concurrentPythonWorkers=2 \
```
This parameter aims to limit the total concurrent running `Python process` in 1 Spark executor. This parameter is set to 0 by default which means there's not limit for concurrent Python workers. Note that for certain cases, setting this value too small may result a `hang` for your Spark job because a PandasUDF may produces multiple python process and each will try to acquire the python GPU process semaphore. This may bring a dead lock situation becasue a Spark job will not preceed until all its tasks are finished. For example, in the Pandas UDF stage, 2 tasks are running and each task launches 3 Python process and we set this parameter to 4.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused by this, how do I know how many python processes I need? On spark I usually get one per python process per task. Can we add more detail here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made wrong description here, sorry for that. Discussed with Liangcai today and re-state with more specific example.

docs/additional-functionality/rapids-udfs.md Outdated Show resolved Hide resolved
docs/additional-functionality/rapids-udfs.md Outdated Show resolved Hide resolved
docs/additional-functionality/rapids-udfs.md Outdated Show resolved Hide resolved
docs/additional-functionality/rapids-udfs.md Outdated Show resolved Hide resolved
docs/additional-functionality/rapids-udfs.md Outdated Show resolved Hide resolved
docs/additional-functionality/rapids-udfs.md Outdated Show resolved Hide resolved
docs/additional-functionality/rapids-udfs.md Outdated Show resolved Hide resolved
docs/additional-functionality/rapids-udfs.md Outdated Show resolved Hide resolved
docs/additional-functionality/rapids-udfs.md Outdated Show resolved Hide resolved
docs/additional-functionality/rapids-udfs.md Outdated Show resolved Hide resolved
@wjxiz1992
Copy link
Collaborator Author

Hi, @jlowe @tgravescs @firestarman I updated the doc, main modifications are:

  1. add a Exec support status table, this aims to help users know what exec switch they should turn on and know the support status.
  2. remove unnecessary parts.
  3. refine the concurrentPythonWorkers parts with more details example.

The 3) are most difficult to explain as it relies on the implementation a lot, please help check if that part is clear for readers.
Thanks a lot!

|----------------------|----------|--------|
|ArrowEvalPythonExec|[Series to Series](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#series-to-series), [Iterator of Series to Iterator of Series](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#series-to-series) and [Iterator of Multiple Series to Iterator of Series](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#iterator-of-multiple-series-to-iterator-of-series)| supported|
|MapInPandasExec| [Map](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#map)| supported|
| WindowInPandasExec | [Window](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#series-to-scalar)| supported|
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit looks like we have extra spaces in a couple of these, we may also switch that column to be second and have use case third

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

switched.

Accelerator has a 1-1 mapping support for each of them. Not all PandasUDF types are data-transfer
accelerated at present:

| Spark Execution Plan | Use Case | Status |
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we change Status to be data transfer accelerated or accelerated just to be more clear

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use data transfer accelerated.

![Python concurrent worker](/docs/img/concurrentPythonWorker.PNG)

In this case, each PandasUDF will launch a Python process. At this moment two python process
in each task acquired their semaphore but neither of them are able to proceed becasue both
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because is misspelled

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

docs/additional-functionality/rapids-udfs.md Outdated Show resolved Hide resolved
docs/additional-functionality/rapids-udfs.md Outdated Show resolved Hide resolved
docs/additional-functionality/rapids-udfs.md Outdated Show resolved Hide resolved
docs/additional-functionality/rapids-udfs.md Outdated Show resolved Hide resolved
docs/additional-functionality/rapids-udfs.md Outdated Show resolved Hide resolved
docs/additional-functionality/rapids-udfs.md Outdated Show resolved Hide resolved
docs/additional-functionality/rapids-udfs.md Outdated Show resolved Hide resolved
@jlowe jlowe self-requested a review April 8, 2021 16:52
Copy link
Member

@jlowe jlowe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a small update but otherwise lgtm.

docs/additional-functionality/rapids-udfs.md Outdated Show resolved Hide resolved
Copy link
Collaborator

@sameerz sameerz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me, aside from the requested changes.

docs/additional-functionality/rapids-udfs.md Outdated Show resolved Hide resolved
@sameerz sameerz added the documentation Improvements or additions to documentation label Apr 8, 2021
@wjxiz1992
Copy link
Collaborator Author

Add one more example for GpuArrowEvalPython suggested by team. along with some doc clean.

@firestarman
Copy link
Collaborator

LGTM

jlowe
jlowe previously approved these changes Apr 9, 2021
docs/additional-functionality/rapids-udfs.md Outdated Show resolved Hide resolved
docs/additional-functionality/rapids-udfs.md Outdated Show resolved Hide resolved
docs/additional-functionality/rapids-udfs.md Outdated Show resolved Hide resolved
1. Make sure GPU exclusive mode is disabled. Note that this will not work if you are using exclusive
mode to assign GPUs under spark.
2. Currently the python files are packed into the spark rapids plugin jar.
1. Make sure GPU `exclusive` mode is disabled. Note that this will not work if you are using exclusive
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For On-prem YARN instructions, we add a line "refer to nvidia-smi documentation".

Do we need to add the link or sample like this?
nvidia-smi -i 0 -c EXCLUSIVE_PROCESS # Set GPU 0 to exclusive mode, run as root.

Also, if this mandatory for YARN, should we say that more explicitly?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for review!
Added a line to show the command to set GPU to Default mode.
I think no need to say if it's for yarn or for other environment. The mode here is only for GPU hardware. If the GPU is in Default mode, it will always be default mode no matter on Yarn or on Standalone or Local.

same GPU with JVM.
- **GPU Assignment(Scheduling) in Python Process**: Let the Python process share the same GPU with
Spark executor JVM. Without this feature, in a non-isolated environment, some use cases with
PandasUDF (a.k.a an `independent` process) can try to use GPUs other than the one we want it to
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
PandasUDF (a.k.a an `independent` process) can try to use GPUs other than the one we want it to
Pandas UDF (an `independent` python daemon process) can try to use GPUs other than the one we want it to

@@ -174,31 +178,102 @@ To enable _GPU Scheduling for Pandas UDF_, you need to configure your spark job
--py-files ${SPARK_RAPIDS_PLUGIN_JAR}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also change the standalone part's rapids-4-spark_2.12-0.5.0-SNAPSHOT.jar to ${SPARK_RAPIDS_PLUGIN_JAR} ?

```

Please note the data transfer acceleration only supports scalar UDF and Scalar iterator UDF currently.
You could choose the exec you need to enable.
Please note: every type of PandasUDF on Spark is run by a specific Spark execution plan. RAPIDS
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Please note: every type of PandasUDF on Spark is run by a specific Spark execution plan. RAPIDS
Please note: every type of Pandas UDF on Spark is run by a specific Spark execution plan. RAPIDS

Please note the data transfer acceleration only supports scalar UDF and Scalar iterator UDF currently.
You could choose the exec you need to enable.
Please note: every type of PandasUDF on Spark is run by a specific Spark execution plan. RAPIDS
Accelerator has a 1-1 mapping support for each of them. Not all PandasUDF types are data-transfer
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Accelerator has a 1-1 mapping support for each of them. Not all PandasUDF types are data-transfer
Accelerator has a 1-1 mapping support for each of them. Not all Pandas UDF types are data-transfer


| Spark Execution Plan|Data Transfer Accelerated|Use Case|
|----------------------|----------|--------|
|ArrowEvalPythonExec|yes|[Series to Series](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#series-to-series), [Iterator of Series to Iterator of Series](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#series-to-series) and [Iterator of Multiple Series to Iterator of Series](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#iterator-of-multiple-series-to-iterator-of-series)| supported|
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
|ArrowEvalPythonExec|yes|[Series to Series](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#series-to-series), [Iterator of Series to Iterator of Series](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#series-to-series) and [Iterator of Multiple Series to Iterator of Series](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#iterator-of-multiple-series-to-iterator-of-series)| supported|
|ArrowEvalPythonExec|yes|[Series to Series](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#series-to-series), [Iterator of Series to Iterator of Series](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#iterator-of-series-to-iterator-of-series) and [Iterator of Multiple Series to Iterator of Series](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#iterator-of-multiple-series-to-iterator-of-series)| supported|

deadlock situation because a Spark job will not proceed until all its tasks are finished.

For example, in a specific Spark Stage that contais 3 PandasUDFs, 2 Spark tasks are running and
each task launches 3 Python process while we set this `concurrentPythonWorkers` to 4.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
each task launches 3 Python process while we set this `concurrentPythonWorkers` to 4.
each task launches 3 Python processes while we set this `spark.rapids.python.concurrentPythonWorkers` to 4.


![Python concurrent worker](../img/concurrentPythonWorker.PNG)

In this case, each PandasUDF will launch a Python process. At this moment two Python process
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
In this case, each PandasUDF will launch a Python process. At this moment two Python process
In this case, each Pandas UDF will launch a Python process. At this moment two Python processes

in each task(in light green) acquired their semaphore but neither of them are able to proceed
because both of them are waiting for their third semaphore to start the task.

Another example is to use ArrowEvalPythonExec, with the following code:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Another example is to use ArrowEvalPythonExec, with the following code:
Another example is to use `ArrowEvalPythonExec` with the following code:

+- GpuArrowEvalPython
```
This means each Spark task will trigger 2 Python processes. In this case, if we set
`concurrentPythonWorkers=2`, it will also probably result in a hang as we allow 2 tasks running
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
`concurrentPythonWorkers=2`, it will also probably result in a hang as we allow 2 tasks running
`spark.rapids.python.concurrentPythonWorkers=2`, it will also probably result in a hang as we allow 2 tasks running

```
This means each Spark task will trigger 2 Python processes. In this case, if we set
`concurrentPythonWorkers=2`, it will also probably result in a hang as we allow 2 tasks running
and each of them has 2 Python processes. Let's say Task_1_Process_1 and Task_2_Process_1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
and each of them has 2 Python processes. Let's say Task_1_Process_1 and Task_2_Process_1
and each of them spawns 2 Python processes. Let's say Task_1_Process_1 and Task_2_Process_1

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for review! Updated.

sameerz
sameerz previously approved these changes Apr 13, 2021
@wjxiz1992 wjxiz1992 requested a review from viadea April 16, 2021 01:57
viadea
viadea previously approved these changes Apr 18, 2021
@sameerz
Copy link
Collaborator

sameerz commented Apr 20, 2021

build

@sameerz sameerz merged commit a50f9dd into NVIDIA:branch-0.5 Apr 20, 2021
@viadea
Copy link
Collaborator

viadea commented May 14, 2021

@firestarman @wjxiz1992 Just found a possible typo here spark.rapids.python.gpu.enabled should be changed to spark.rapids.sql.python.gpu.enabled. If you agree, do you want to create a PR to correct 0.5 and 0.6 doc?thx

@revans2
Copy link
Collaborator

revans2 commented May 14, 2021

@firestarman @wjxiz1992 Just found a possible typo here spark.rapids.python.gpu.enabled should be changed to spark.rapids.sql.python.gpu.enabled. If you agree, do you want to create a PR to correct 0.5 and 0.6 doc?thx

Yes it should be spark.rapids.sql.python.gpu.enabled. That is what is in the code for the config.

nartal1 pushed a commit to nartal1/spark-rapids that referenced this pull request Jun 9, 2021
* Update PandasUDF doc

Update Pandas UDF doc with more details description

Signed-off-by: Allen Xu <wjxiz1992@gmail.com>

* Resolve comments

* More doc clean

* doc clean and table reformat

* doc clean

* doc clean

* Doc update

* resolve comments

* Resolve comments

* Resolve comments

* resolve comments

* resolve comments

* Resolve comments
nartal1 pushed a commit to nartal1/spark-rapids that referenced this pull request Jun 9, 2021
* Update PandasUDF doc

Update Pandas UDF doc with more details description

Signed-off-by: Allen Xu <wjxiz1992@gmail.com>

* Resolve comments

* More doc clean

* doc clean and table reformat

* doc clean

* doc clean

* Doc update

* resolve comments

* Resolve comments

* Resolve comments

* resolve comments

* resolve comments

* Resolve comments
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[DOC] pandas udf section name confusing
8 participants