Skip to content

Commit

Permalink
Skip cudf_udf test by default (NVIDIA#824)
Browse files Browse the repository at this point in the history
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
  • Loading branch information
shotai authored and sperlingxx committed Nov 20, 2020
1 parent 1b0516e commit 72082af
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 64 deletions.
25 changes: 25 additions & 0 deletions integration_tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,31 @@ As an example, here is the `spark-submit` command with the TPCxBB parameters:
$SPARK_HOME/bin/spark-submit --jars "rapids-4-spark_2.12-0.3.0-SNAPSHOT.jar,cudf-0.16-SNAPSHOT.jar,rapids-4-spark-tests_2.12-0.3.0-SNAPSHOT.jar" ./runtests.py --tpcxbb_format="csv" --tpcxbb_path="/path/to/tpcxbb/csv"
```

### Enabling cudf_udf Tests

The cudf_udf tests in this framework are testing Pandas UDF(user-defined function) with cuDF. They are disabled by default not only because of the complicated environment setup, but also because GPU resources scheduling for Pandas UDF is an experimental feature now, the performance may not always be better.
The tests can be enabled by just appending the option `--cudf_udf` to the command.

* `--cudf_udf` (enable the cudf_udf tests when provided, and remove this option if you want to disable the tests)

cudf_udf tests needs a couple of different settings, they may need to run separately.

To enable cudf_udf tests, need following pre requirements:
* Install cuDF Python library on all the nodes running executors. The instruction could be found at [here](https://rapids.ai/start.html). Please follow the steps to choose the version based on your environment and install the cuDF library via Conda or use other ways like building from source.
* Disable the GPU exclusive mode on all the nodes running executors. The sample command is `sudo nvidia-smi -c DEFAULT`

To run cudf_udf tests, need following configuration changes:
* Add configurations `--py-files` and `spark.executorEnv.PYTHONPATH` to specify the plugin jar for python modules 'rapids/daemon' 'rapids/worker'.
* Decrease `spark.rapids.memory.gpu.allocFraction` to reserve enough GPU memory for Python processes in case of out-of-memory.
* Add `spark.rapids.python.concurrentPythonWorkers` and `spark.rapids.python.memory.gpu.allocFraction` to reserve enough GPU memory for Python processes in case of out-of-memory.

As an example, here is the `spark-submit` command with the cudf_udf parameter:

```
$SPARK_HOME/bin/spark-submit --jars "rapids-4-spark_2.12-0.3.0-SNAPSHOT.jar,cudf-0.16-SNAPSHOT.jar,rapids-4-spark-tests_2.12-0.3.0-SNAPSHOT.jar" --conf spark.rapids.memory.gpu.allocFraction=0.3 --conf spark.rapids.python.memory.gpu.allocFraction=0.3 --conf spark.rapids.python.concurrentPythonWorkers=2 --py-files "rapids-4-spark_2.12-0.3.0-SNAPSHOT.jar" --conf spark.executorEnv.PYTHONPATH="rapids-4-spark_2.12-0.2.0-SNAPSHOT.jar" ./runtests.py --cudf_udf
```


## Writing tests

There are a number of libraries provided to help someone write new tests.
Expand Down
3 changes: 3 additions & 0 deletions integration_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,6 @@ def pytest_addoption(parser):
parser.addoption(
"--runtime_env", action='store', default="Apache", help="the runtime environment for the tests - apache or databricks"
)
parser.addoption(
"--cudf_udf", action='store_true', default=False, help="if true enable cudf_udf test"
)
6 changes: 6 additions & 0 deletions integration_tests/src/main/python/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,3 +368,9 @@ def tpcds(request):
else:
yield TpcdsRunner(tpcds_format, tpcds_path)

@pytest.fixture(scope="session")
def enable_cudf_udf(request):
enable_udf_cudf = request.config.getoption("cudf_udf")
if not enable_udf_cudf:
pytest.skip("cudf_udf not configured to run")

124 changes: 69 additions & 55 deletions integration_tests/src/main/python/udf_cudf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest
import pandas as pd
import pytest
import time
from distutils.version import LooseVersion
from typing import Iterator
from pyspark.sql import Window
from pyspark.sql.functions import pandas_udf, PandasUDFType
from spark_init_internal import spark_version
from spark_session import with_cpu_session, with_gpu_session
from marks import allow_non_gpu, cudf_udf

pytestmark = pytest.mark.skipif(LooseVersion(spark_version()) >= LooseVersion('3.1.0'),
reason="Pandas UDF on GPU tests don't support Spark 3.1.0+ yet")


_conf = {
'spark.rapids.sql.exec.MapInPandasExec':'true',
Expand All @@ -31,11 +36,11 @@
'spark.rapids.sql.python.gpu.enabled': 'true'
}


def _create_df(spark):
return spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v")
)
elements = list(map(lambda i: (i, i/1.0), range(1, 5000)))
return spark.createDataFrame(elements * 2, ("id", "v"))


# since this test requires to run different functions on CPU and GPU(need cudf),
# create its own assert function
Expand All @@ -54,64 +59,68 @@ def _assert_cpu_gpu(cpu_func, gpu_func, cpu_conf={}, gpu_conf={}, is_sort=False)
assert cpu_ret.sort() == gpu_ret.sort()
else:
assert cpu_ret == gpu_ret



# ======= Test Scalar =======
@pandas_udf('int')
def _plus_one_cpu_func(v: pd.Series) -> pd.Series:
return v + 1


@pandas_udf('int')
def _plus_one_gpu_func(v: pd.Series) -> pd.Series:
import cudf
gpu_serises = cudf.Series(v)
gpu_serises = gpu_serises + 1
return gpu_serises.to_pandas()
gpu_series = cudf.Series(v)
gpu_series = gpu_series + 1
return gpu_series.to_pandas()


@allow_non_gpu(any=True)
@pytest.mark.skip("exception in docker: OSError: Invalid IPC stream: negative continuation token, skip for now")
@cudf_udf
def test_with_column():
def test_with_column(enable_cudf_udf):
def cpu_run(spark):
df = _create_df(spark)
return df.withColumn("v1", _plus_one_cpu_func(df.v)).collect()

def gpu_run(spark):
df = _create_df(spark)
return df.withColumn("v1", _plus_one_gpu_func(df.v)).collect()

_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf)

@allow_non_gpu(any=True)
@pytest.mark.skip("exception in docker: OSError: Invalid IPC stream: negative continuation token, skip for now")

@cudf_udf
def test_sql():
def test_sql(enable_cudf_udf):
def cpu_run(spark):
_ = spark.udf.register("add_one_cpu", _plus_one_cpu_func)
return spark.sql("SELECT add_one_cpu(id) FROM range(3)").collect()
_create_df(spark).createOrReplaceTempView("test_table_cpu")
return spark.sql("SELECT add_one_cpu(id) FROM test_table_cpu").collect()

def gpu_run(spark):
_ = spark.udf.register("add_one_gpu", _plus_one_gpu_func)
return spark.sql("SELECT add_one_gpu(id) FROM range(3)").collect()

_create_df(spark).createOrReplaceTempView("test_table_gpu")
return spark.sql("SELECT add_one_gpu(id) FROM test_table_gpu").collect()

_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf)


# ======= Test Scalar Iterator =======
@pandas_udf("long")
def _plus_one_cpu_iter_func(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
for s in iterator:
yield s + 1


@pandas_udf("long")
def _plus_one_gpu_iter_func(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
import cudf
for s in iterator:
gpu_serises = cudf.Series(s)
gpu_serises = gpu_serises + 1
yield gpu_serises.to_pandas()

@allow_non_gpu(any=True)
@pytest.mark.skip("exception in docker: OSError: Invalid IPC stream: negative continuation token, skip for now")


@cudf_udf
def test_select():
def test_select(enable_cudf_udf):
def cpu_run(spark):
df = _create_df(spark)
return df.select(_plus_one_cpu_iter_func(df.v)).collect()
Expand All @@ -123,95 +132,98 @@ def gpu_run(spark):
_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf)


@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/746")
# ======= Test Flat Map In Pandas =======
@allow_non_gpu('GpuMapInPandasExec','PythonUDF')
@cudf_udf
def test_map_in_pandas():
def test_map_in_pandas(enable_cudf_udf):
def cpu_run(spark):
df = _create_df(spark)
def _filter_cpu_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df = _create_df(spark)
return df.mapInPandas(_filter_cpu_func, df.schema).collect()

def gpu_run(spark):
df = _create_df(spark)
def _filter_gpu_func(iterator):
import cudf
for pdf in iterator:
gdf = cudf.from_pandas(pdf)
yield gdf[gdf.id == 1].to_pandas()
df = _create_df(spark)
return df.mapInPandas(_filter_gpu_func, df.schema).collect()

_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf)


# ======= Test Grouped Map In Pandas =======
# To solve: Invalid udf: the udf argument must be a pandas_udf of type GROUPED_MAP
# need to add udf type
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def _normalize_cpu_func(df):
v = df.v
return df.assign(v=(v - v.mean()) / v.std())


@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def _normalize_gpu_func(df):
import cudf
gdf = cudf.from_pandas(df)
v = gdf.v
return gdf.assign(v=(v - v.mean()) / v.std()).to_pandas()

@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/746")

@allow_non_gpu('GpuFlatMapGroupsInPandasExec','PythonUDF')
@cudf_udf
def test_group_apply():
def test_group_apply(enable_cudf_udf):
def cpu_run(spark):
df = _create_df(spark)
return df.groupby("id").apply(_normalize_cpu_func).collect()

def gpu_run(spark):
df = _create_df(spark)
return df.groupby("id").apply(_normalize_gpu_func).collect()

_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True)


@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/746")
@allow_non_gpu('GpuFlatMapGroupsInPandasExec','PythonUDF')
@cudf_udf
def test_group_apply_in_pandas():
def test_group_apply_in_pandas(enable_cudf_udf):
def cpu_run(spark):
df = _create_df(spark)
def _normalize_cpu_in_pandas_func(df):
v = df.v
return df.assign(v=(v - v.mean()) / v.std())
df = _create_df(spark)
return df.groupby("id").applyInPandas(_normalize_cpu_in_pandas_func, df.schema).collect()

def gpu_run(spark):
df = _create_df(spark)
def _normalize_gpu_in_pandas_func(df):
import cudf
gdf = cudf.from_pandas(df)
v = gdf.v
return gdf.assign(v=(v - v.mean()) / v.std()).to_pandas()
df = _create_df(spark)
return df.groupby("id").applyInPandas(_normalize_gpu_in_pandas_func, df.schema).collect()

_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True)


# ======= Test Aggregate In Pandas =======
@pandas_udf("int")
def _sum_cpu_func(v: pd.Series) -> int:
return v.sum()


@pandas_udf("integer")
def _sum_gpu_func(v: pd.Series) -> int:
import cudf
gpu_serises = cudf.Series(v)
return gpu_serises.sum()
gpu_series = cudf.Series(v)
return gpu_series.sum()


@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/746")
@allow_non_gpu('GpuAggregateInPandasExec','PythonUDF','Alias')
@cudf_udf
def test_group_agg():
def test_group_agg(enable_cudf_udf):
def cpu_run(spark):
df = _create_df(spark)
return df.groupby("id").agg(_sum_cpu_func(df.v)).collect()
Expand All @@ -223,10 +235,9 @@ def gpu_run(spark):
_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True)


@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/746")
@allow_non_gpu('GpuAggregateInPandasExec','PythonUDF','Alias')
@cudf_udf
def test_sql_group():
def test_sql_group(enable_cudf_udf):
def cpu_run(spark):
_ = spark.udf.register("sum_cpu_udf", _sum_cpu_func)
q = "SELECT sum_cpu_udf(v1) FROM VALUES (3, 0), (2, 0), (1, 1) tbl(v1, v2) GROUP BY v2"
Expand All @@ -240,10 +251,11 @@ def gpu_run(spark):
_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True)


@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/746")
@allow_non_gpu('GpuWindowInPandasExec','PythonUDF','Alias','WindowExpression','WindowSpecDefinition','SpecifiedWindowFrame','UnboundedPreceding$', 'UnboundedFollowing$')
# ======= Test Window In Pandas =======
@allow_non_gpu('GpuWindowInPandasExec','PythonUDF','Alias','WindowExpression','WindowSpecDefinition',
'SpecifiedWindowFrame','UnboundedPreceding$', 'UnboundedFollowing$')
@cudf_udf
def test_window():
def test_window(enable_cudf_udf):
def cpu_run(spark):
df = _create_df(spark)
w = Window.partitionBy('id').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
Expand All @@ -254,37 +266,39 @@ def gpu_run(spark):
w = Window.partitionBy('id').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
return df.withColumn('sum_v', _sum_gpu_func('v').over(w)).collect()

_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True)
_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True)


@pytest.mark.skip("https://github.com/NVIDIA/spark-rapids/issues/746")
# ======= Test CoGroup Map In Pandas =======
@allow_non_gpu('GpuFlatMapCoGroupsInPandasExec','PythonUDF')
@cudf_udf
def test_cogroup():
def test_cogroup(enable_cudf_udf):
def cpu_run(spark):
def _cpu_join_func(l, r):
return pd.merge(l, r, on="time")
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def _cpu_join_func(l, r):
return pd.merge(l, r, on="time")
return df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(_cpu_join_func, schema="time int, id_x int, id_y int, v1 double, v2 string").collect()
return df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(_cpu_join_func,
schema="time int, id_x int, id_y int, v1 double, v2 string").collect()

def gpu_run(spark):
def _gpu_join_func(l, r):
import cudf
gl = cudf.from_pandas(l)
gr = cudf.from_pandas(r)
return gl.merge(gr, on="time").to_pandas()
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def _gpu_join_func(l, r):
import cudf
gl = cudf.from_pandas(l)
gr = cudf.from_pandas(r)
return gl.merge(gr, on="time").to_pandas()
return df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(_gpu_join_func, schema="time int, id_x int, id_y int, v1 double, v2 string").collect()
return df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(_gpu_join_func,
schema="time int, id_x int, id_y int, v1 double, v2 string").collect()

_assert_cpu_gpu(cpu_run, gpu_run, gpu_conf=_conf, is_sort=True)

Expand Down
4 changes: 2 additions & 2 deletions jenkins/spark-premerge-build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ tar zxf $SPARK_HOME.tgz -C $ARTF_ROOT && \

mvn -U -B $MVN_URM_MIRROR '-P!snapshot-shims' clean verify -Dpytest.TEST_TAGS=''
# Run the unit tests for other Spark versions but dont run full python integration tests
env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Pspark301tests,snapshot-shims test -Dpytest.TEST_TAGS='not cudf_udf'
env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Pspark310tests,snapshot-shims test -Dpytest.TEST_TAGS='not cudf_udf'
env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Pspark301tests,snapshot-shims test -Dpytest.TEST_TAGS=''
env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Pspark310tests,snapshot-shims test -Dpytest.TEST_TAGS=''

# The jacoco coverage should have been collected, but because of how the shade plugin
# works and jacoco we need to clean some things up so jacoco will only report for the
Expand Down
Loading

0 comments on commit 72082af

Please sign in to comment.