Skip to content

Latest commit

 

History

History
458 lines (344 loc) · 26.5 KB

README.md

File metadata and controls

458 lines (344 loc) · 26.5 KB

RAPIDS Plugin For Apache Spark Integration Tests

This is a set of integration tests for the RAPIDS Plugin for Apache Spark. These tests are intended to be able to be run against any Spark-compatible cluster/release to help verify that the plugin is doing the right thing in as many cases as possible.

There are two sets of tests here. The pyspark tests are described here. The scala tests are described here

Dependencies

The tests are based off of pyspark and pytest running on Python 3. There really are only a small number of Python dependencies that you need to install for the tests. The dependencies also only need to be on the driver. You can install them on all nodes in the cluster but it is not required.

pytest

pip install pytest

Should be enough to get the basics started.

sre_yield

pip install sre_yield

sre_yield provides a set of APIs to generate string data from a regular expression.

pandas

pip install pandas

pandas is a fast, powerful, flexible and easy to use open source data analysis and manipulation tool and is only needed when testing integration with pandas.

pyarrow

pip install pyarrow

pyarrow provides a Python API for functionality provided by the Arrow C++ libraries, along with tools for Arrow integration and interoperability with pandas, NumPy, and other software in the Python ecosystem. This is used to test improved transfer performance to pandas based user defined functions.

pytest-xdist and findspark

pytest-xdist and findspark can be used to speed up running the tests by running them in parallel.

Running

Tests will run as a part of the maven build if you have the environment variable SPARK_HOME set.

The suggested way to run these tests is to use the shell-script file located in the integration_tests folder called run_pyspark_from_build.sh. This script takes care of some of the flags that are required to run the tests which will have to be set for the plugin to work. It will be very useful to read the contents of the run_pyspark_from_build.sh to get a better insight into what is needed as we constantly keep working on to improve and expand the plugin-support.

The python tests run with pytest and the script honors pytest parameters. Some handy flags are:

  • -k . This will run all the tests in that test file.
  • -k . This will also run an individual test.
  • -s Doesn't capture the output and instead prints to the screen.
  • -v Increase the verbosity of the tests
  • -r fExXs Show extra test summary info as specified by chars: (f)ailed, (E)rror, (x)failed, (X)passed, (s)kipped
  • For other options and more details please visit pytest-usage or type pytest --help

Spark execution mode

Spark Applications (pytest in this case) can be run against different cluster backends specified by the configuration spark.master. It can be provided by various means such as via --master argument of spark-submit.

By default, the local mode is used to run the Driver and Executors in the same JVM. Albeit convenient, this mode sometimes masks problems occurring in fully distributed production deployments. These are often bugs related to object serialization and hash code implementation.

Thus, Apache Spark provides another lightweight way to test applications in the pseudo-distributed local-cluster[numWorkers,coresPerWorker,memoryPerWorker] mode where executors are run in separate JVMs on your local machine.

The following environment variables control the behavior in the run_pyspark_from_build.sh script

  • NUM_LOCAL_EXECS if set to a positive integer value activates the local-cluster mode and sets the number of workers to NUM_LOCAL_EXECS
  • CORES_PER_EXEC determines the number of cores per executor if local-cluster is activated
  • MB_PER_EXEC determines the amount of memory per executor in megabyte if local-cluster is activated

Pytest execution mode

By default the tests try to use the python packages pytest-xdist and findspark to oversubscribe your GPU and run the tests in Spark local mode. This can speed up these tests significantly as all of the tests that run by default process relatively small amounts of data. Be careful because if you have SPARK_CONF_DIR also set the tests will try to use whatever cluster you have configured. If you do want to run the tests in parallel on an existing cluster it is recommended that you set -Dpytest.TEST_PARALLEL to one less than the number of worker applications that will be running on the cluster. This is because pytest-xdist will launch one control application that is not included in that number. All it does is farm out work to the other applications, but because it needs to know about the Spark cluster to determine which tests to run and how it still shows up as a Spark application.

To run the tests separate from the build go to the integration_tests directory. You can submit runtests.py through spark-submit, but if you want to run the tests in parallel with pytest-xdist you will need to submit it as a regular python application and have findspark installed. Be sure to include the necessary jars for the RAPIDS plugin either with spark-submit or with the cluster when it is setup. The command line arguments to runtests.py are the same as for pytest. The only reason we have a separate script is that spark-submit uses python if the file name ends with .py.

If you want to configure the Spark cluster you may also set environment variables for the tests. The name of the env var should be in the form "PYSP_TEST_" + conf_key.replace('.', '_'). Linux does not allow '.' in the name of an environment variable so we replace it with an underscore. As Spark configs avoid this character we have no other special processing.

We also have a large number of integration tests that currently run as a part of the unit tests using scala test. Those are in the src/test/scala sub-directory and depend on the testing framework from the rapids-4-spark-tests_2.12 test jar.

You can run these tests against a cluster similar to how you can run pytests against an existing cluster. To do this you need to launch a cluster with the plugin jars on the classpath. The tests will enable and disable the plugin as they run.

Next you need to copy over some test files to whatever distributed file system you are using. The test files are everything under ./integration_tests/src/test/resources/ Be sure to note where you placed them because you will need to tell the tests where they are.

When running these tests you will need to include the test jar, the integration test jar, the udf-examples jar, scala-test and scalactic. You can find scala-test and scalactic under ~/.m2/repository.

It is recommended that you use spark-shell and the scalatest shell to run each test individually, so you don't risk running unit tests along with the integration tests. http://www.scalatest.org/user_guide/using_the_scalatest_shell

spark-shell --jars rapids-4-spark-tests_2.12-21.10.0-SNAPSHOT-tests.jar,rapids-4-spark-udf-examples_2.12-21.10.0-SNAPSHOT.jar,rapids-4-spark-integration-tests_2.12-21.10.0-SNAPSHOT-tests.jar,scalatest_2.12-3.0.5.jar,scalactic_2.12-3.0.5.jar

First you import the scalatest_shell and tell the tests where they can find the test files you just copied over.

import org.scalatest._
com.nvidia.spark.rapids.TestResourceFinder.setPrefix(PATH_TO_TEST_FILES)

Next you can start to run the tests.

durations.run(new com.nvidia.spark.rapids.JoinsSuite)

Most clusters probably will not have the RAPIDS plugin installed in the cluster yet. If you just want to verify the SQL replacement is working you will need to add the rapids-4-spark and cudf jars to your spark-submit command. Note the following example assumes CUDA 11.0 is being used.

$SPARK_HOME/bin/spark-submit --jars "rapids-4-spark_2.12-21.10.0-SNAPSHOT.jar,rapids-4-spark-udf-examples_2.12-21.10.0-SNAPSHOT.jar,cudf-21.10.0-SNAPSHOT-cuda11.jar" ./runtests.py

You don't have to enable the plugin for this to work, the test framework will do that for you.

You do need to have access to a compatible GPU with the needed CUDA drivers. The exact details of how to set this up are beyond the scope of this document, but the Spark feature for scheduling GPUs does make this very simple if you have it configured.

Runtime Environment

--runtime_env is used to specify the environment you are running the tests in. Valid values are databricks,emr,dataproc and apache. This is generally used when certain environments have different behavior, and the tests don't have a good way to auto-detect the environment yet.

timezone

The RAPIDS plugin currently only supports the UTC time zone. Spark uses the default system time zone unless explicitly set otherwise. To make sure that the tests work properly you need to configure your cluster or application to run with UTC. The python framework cannot always do this for you because it risks overwriting other java options in the config. Please be sure that the following configs are set when running the tests.

  • spark.driver.extraJavaOptions should include -Duser.timezone=UTC
  • spark.executor.extraJavaOptions should include -Duser.timezone=UTC
  • spark.sql.session.timeZone=UTC

Running in parallel

You may use pytest-xdist to run the tests in parallel. This is done by running the tests through python, not spark-submit, and setting the parallelism with the -n command line parameter. Be aware that pytest-xdist will launch one control application and the given number of worker applications, so your cluster needs to be large enough to handle one more application than the parallelism you set. Most tests are small and don't need even a full GPU to run. So setting your applications to use a single executor and a single GPU per executor is typically enough. When running from maven we assume that we are running in local mode and will try to oversubscribe a single GPU. Typically we find that the tests don't need more than 2GB of GPU memory so we can speed up the tests significantly by doing this. It is not easy nor recommended to try and configure an actual cluster so you can oversubscribe GPUs. Please don't try it.

Under YARN and Kubernetes you can set spark.executor.instances to the number of executors you want running in your application (1 typically). Spark will auto launch a driver for each application too, but if you configured it correctly that would not take any GPU resources on the cluster. For standalone, Mesos, and Kubernetes you can set spark.cores.max to one more than the number of executors you want to use per application. The extra core is for the driver. Dynamic allocation can mess with these settings under YARN and even though it is off by default you probably want to be sure it is disabled (spark.dynamicAllocation.enabled=false).

Running with Alternate Paths

In case your test jars and resources are downloaded to the local-path from dependency Repo, and you want to run tests with them using the shell-script run_pyspark_from_build.sh, then the LOCAL_JAR_PATH=local-path must be set to point to the local-path, e.g. LOCAL_JAR_PATH=local-path bash [run_pyspark_from_build.sh](run_pyspark_from_build.sh).By setting LOCAL_JAR_PATH=local-path the shell-script run_pyspark_from_build.sh can find the test jars and resources in the alternate path.

When running the shell-script run_pyspark_from_build.sh under YARN or Kubernetes, the $SCRIPTPATH in the python options --rootdir $SCRIPTPATH ... and --std_input_path $SCRIPTPATH ... will not work, as the $SCRIPTPATH is a local path, you need to overwrite it to the clould paths. Basically, you need first to upload the test resources onto the cloud path resource-path, then transfer the test resources onto the working directory root-dir of each executor(e.g. via spark-submit --files root-dir ...). After that you must set both LOCAL_ROOTDIR=root-dir and INPUT_PATH=resource-path to run the shell-script, e.g. LOCAL_ROOTDIR=root-dir INPUT_PATH=resource-path bash [run_pyspark_from_build.sh](run_pyspark_from_build.sh).

Reviewing integration tests in Spark History Server

If the integration tests are run using run_pyspark_from_build.sh we have the event log enabled by default. You can opt out by setting the environment variable SPARK_EVENTLOG_ENABLED to false.

Compressed event logs will appear under the run directories of the form integration_tests/target/run_dir/eventlog_WORKERID. If xdist is not used (e.g., TEST_PARALLEL=1) the event log directory will be integration_tests/target/run_dir/eventlog_gw0 as if executed by worker 0 under xdist.

To review all the tests run by a particular worker you can start the History Server as follows:

SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=integration_tests/target/run_dir/eventlog_gw0" \
  ${SPARK_HOME}/bin/spark-class org.apache.spark.deploy.history.HistoryServer

By default, integration tests write event logs using Zstandard (zstd) compression codec. It can be changed by setting the environment variable PYSP_TEST_spark_eventLog_compression_codec to one of the SHS supported values for the config key spark.eventLog.compression.codec

With zstd it's easy to view / decompress event logs using the CLI zstd -d [--stdout] <file> even without the SHS webUI.

Enabling cudf_udf Tests

The cudf_udf tests in this framework are testing Pandas UDF(user-defined function) with cuDF. They are disabled by default not only because of the complicated environment setup, but also because GPU resources scheduling for Pandas UDF is an experimental feature now, the performance may not always be better. The tests can be enabled by just appending the option --cudf_udf to the command.

  • --cudf_udf (enable the cudf_udf tests when provided, and remove this option if you want to disable the tests)

cudf_udf tests needs a couple of different settings, they may need to run separately.

To enable cudf_udf tests, need following pre requirements:

  • Install cuDF Python library on all the nodes running executors. The instruction could be found at here. Please follow the steps to choose the version based on your environment and install the cuDF library via Conda or use other ways like building from source.
  • Disable the GPU exclusive mode on all the nodes running executors. The sample command is sudo nvidia-smi -c DEFAULT

To run cudf_udf tests, need following configuration changes:

  • Add configurations --py-files and spark.executorEnv.PYTHONPATH to specify the plugin jar for python modules 'rapids/daemon' 'rapids/worker'.
  • Decrease spark.rapids.memory.gpu.allocFraction to reserve enough GPU memory for Python processes in case of out-of-memory.
  • Add spark.rapids.python.concurrentPythonWorkers and spark.rapids.python.memory.gpu.allocFraction to reserve enough GPU memory for Python processes in case of out-of-memory.

As an example, here is the spark-submit command with the cudf_udf parameter on CUDA 11.0:

$SPARK_HOME/bin/spark-submit --jars "rapids-4-spark_2.12-21.10.0-SNAPSHOT.jar,rapids-4-spark-udf-examples_2.12-21.10.0-SNAPSHOT.jar,cudf-21.10.0-SNAPSHOT-cuda11.jar,rapids-4-spark-tests_2.12-21.10.0-SNAPSHOT.jar" --conf spark.rapids.memory.gpu.allocFraction=0.3 --conf spark.rapids.python.memory.gpu.allocFraction=0.3 --conf spark.rapids.python.concurrentPythonWorkers=2 --py-files "rapids-4-spark_2.12-21.10.0-SNAPSHOT.jar" --conf spark.executorEnv.PYTHONPATH="rapids-4-spark_2.12-21.10.0-SNAPSHOT.jar" ./runtests.py --cudf_udf

Writing tests

There are a number of libraries provided to help someone write new tests.

data_gen.py

data_gen allow you to generate data for various spark types. There tends to be a ${Type}Gen class for every ${Type}Type class supported by the plugin. Each of these has decent default values, so a DoubleGen should produce a random set of DoubleType including most corner cases like -0.0, NaN, Inf and -Inf.

Many of the classes also allow for some customization of the data produced.

All of the classes allow you to add in your own corner cases using with_special_case. So if you want an IntegerGen that produces a lot of 5s you could run.

IntegerGen().with_special_case(5, weight=200)

The value passed in can be a constant value, or it can be a function that takes a random.Random instance to generate the data. The weight lets you set a relative priority compared with other data. By default the randomly distributed data has a weight of 100, and special cases have a weight of 1.

Not everything is documented here and you should look around at the library to see what it supports. Feel free to modify it to add in new options but be careful because we want to maintain good data coverage.

To generate data from a list of *Gen instances or from a single StructGen you can use the gen_df function.

asserts.py and marks.py

asserts.py provides 2 APIs that let you run a command and verify that it produced the "same" result on both the CPU and the GPU.

assert_gpu_and_cpu_are_equal_collect and assert_gpu_and_cpu_are_equal_iterator the difference is in how the results are brought back for verification. Most tests should use the collect version. If you have a test that will produce a very large amount of data you can use the iterator version, but it will be much slower.

Each of these take a function as input. The function will be passed an instance of spark that is configured for the given environment and it should return a data frame. It also takes a dictionary of config entries that can be set when the test runs. Most config entries you care about should be through markers.

pytest uses markers to tag different tests with metadata. This framework uses them to be able to set various configs when running the tests. Markers were chosen because it provides a lot of flexibility even with parameterized tests.

The marks you care about are all in marks.py

  • ignore_order tells the asserts to sort the resulting data because the tests may not produce the results in the same order
  • incompat tells the tests to enable incompat operators. It does not enable approximate comparisons for floating point though.
  • approximate_float tells the tests to compare floating point values (including double) and allow for an error. This follows pytest.approx and will also take rel and abs args.
  • allow_non_gpu tells the tests that not everything in the query will run on the GPU. You can tell it to allow all CPU fallback by @allow_non_gpu(any=True) you can also pass in class names that are enabled for CPU operation.

spark_session.py

For the most part you can ignore this file. It provides the underlying Spark session to operations that need it, but most tests should interact with it through asserts.py.

Guidelines for Testing

When support for a new operator is added to the Rapids Accelerator for Spark, or when an existing operator is extended to support more data types, it is recommended that the following conditions be covered in its corresponding integration tests:

1. Cover all supported data types

Ensure that tests cover all data types supported by the added operation. An exhaustive list of data types supported in Apache Spark is available here. These include:

  • Numeric Types
    • ByteType
    • ShortType
    • IntegerType
    • LongType
    • FloatType
    • DoubleType
    • DecimalType
  • Strings
    • StringType
    • VarcharType
  • Binary (BinaryType)
  • Booleans (BooleanType)
  • Chrono Types
    • TimestampType
    • DateType
    • Interval
  • Complex Types
    • ArrayType
    • StructType
    • MapType

data_gen.py provides DataGen classes that help generate test data in integration tests.

The assert_gpu_and_cpu_are_equal_collect() function from asserts.py may be used to compare that an operator in the Rapids Accelerator produces the same results as Apache Spark, for a test query.

For data types that are not currently supported for an operator in the Rapids Accelerator, the assert_gpu_fallback_collect() function from asserts.py can be used to verify that the query falls back on the CPU operator from Apache Spark, and produces the right results.

2. Nested data types

Complex data types (ArrayType, StructType, MapType) warrant extensive testing for various combinations of nesting. E.g.

  • Array<primitive_type>
  • Array<Array<primitive_type>>
  • Array<Struct<primitive_type>>
  • Struct<Array<primitive_type>>
  • Array<Struct<Array<primitive_type>>>
  • Struct<Array<Struct<primitive_type>>>

The ArrayGen and StructGen classes in data_gen.py can be configured to support arbitrary nesting.

3. Literal (i.e. Scalar) values

Operators and expressions that support literal operands need to be tested with literal inputs, of all supported types from 1 and 2, above. For instance, SUM() supports numeric columns (e.g. SUM(a + b)), or scalars (e.g. SUM(20)). Similarly, COUNT() supports the following:

  • Columns: E.g. COUNT(a) to count non-null rows for column a
  • Scalars: E.g. COUNT(1) to count all rows (including nulls)
  • *: E.g. COUNT(*), functionally equivalent to COUNT(1) It is advised that tests be added for all applicable literal types, for an operator.

Note that for most operations, if all inputs are literal values, the Spark Catalyst optimizer will evaluate the expression during the logical planning phase of query compilation, via Constant Folding E.g. Consider this query:

SELECT SUM(1+2+3) FROM ...

The expression 1+2+3 will not be visible to the Rapids Accelerator for Apache Spark, because it will be evaluated at query compile time, before the Rapids Accelerator is invoked. Thus, adding multiple combinations of literal inputs need not necessarily add more test coverage.

4. Null values

Ensure that the test data accommodates null values for input columns. This includes null values in columns and in literal inputs.

Null values in input columns are a frequent source of bugs in the Rapids Accelerator for Spark, because of mismatches in null-handling and semantics, between RAPIDS libcudf (on which the Rapids Accelerator relies heavily), and Apache Spark.

Tests for aggregations (including group-by, reductions, and window aggregations) should cover cases where some rows are null, and where all input rows are null.

Apart from null rows in columns of primitive types, the following conditions must be covered for nested types:

  • Null rows at the "top" level for Array/Struct columns. E.g. [ [1,2], [3], ∅, [4,5,6] ].
  • Non-null rows containing null elements in the child column. E.g. [ [1,2], [3,∅], ∅, [4,∅,6] ].
  • All null rows at a nested level. E.g.
    • All null list rows: [ ∅, ∅, ∅, ∅ ]
    • All null elements within list rows: [ [∅,∅], [∅,∅], [∅,∅], [∅,∅] ]

The DataGen classes in integration_tests/src/main/python/data_gen.py can be configured to generate null values for the cases mentioned above.

5. Empty rows in Array columns

Operations on ArrayType columns must be tested with input columns containing non-null empty rows. E.g.

[
    [0,1,2,3],
    [], <------- Empty, non-null row.
    [4,5,6,7],
    ...
]

Using the ArrayGen data generator in integration_tests/src/main/python/data_gen.py will generate empty rows as mentioned above.

6. Degenerate cases with "empty" inputs

Ensure that operations are tested with "empty" input columns (i.e. containing zero rows.)

E.g. COUNT() on an empty input column yields 0. SUM() yields 0 for the appropriate numeric type.

7. Special floating point values

Apart from null values, FloatType and DoubleType input columns must also include the following special values:

  • +/- Zero
  • +/- Infinity
  • +/- NaN

Note that the special values for floating point numbers might have different bit representations for the same equivalent values. The Java documentation for longBitsToDouble() describes this with examples. Operations should be tested with multiple bit-representations for these special values.

The FloatGen and DoubleGen data generators in integration_tests/src/main/python/data_gen.py can be configured to generate the special float/double values mentioned above.

For most basic floating-point operations like addition, subtraction, multiplication, and division the plugin will produce a bit for bit identical result as Spark does. For some other functions (like sin, cos, etc.), the output may differ slightly, but remain within the rounding error inherent in floating-point calculations. Certain aggregations might compound those differences. In those cases, the @approximate_float test annotation may be used to mark tests to use "approximate" comparisons for floating-point values.

Refer to the "Floating Point" section of compatibility.md for details.

8. Special values in timestamp columns

Ensure date/timestamp columns include dates before the epoch.

Apache Spark supports dates/timetamps between 0001-01-01 00:00:00.000000 and 9999-12-31 23:59:59.999999, but at values close to the minimum value, the format used in Apache Spark causes rounding errors. To avoid such problems, it is recommended that the minimum value used in a test not actually equal 0001-01-01. For instance, 0001-01-03 is acceptable.

It is advised that DateGen and TimestampGen classes from data_gen.py be used to generate valid (proleptic Gregorian calendar) dates when testing operators that work on dates. This data generator respects the valid boundaries for dates and timestamps.