Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split expensive pytest files in cases level [skip ci] #4336

Merged
merged 2 commits into from
Dec 13, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 52 additions & 20 deletions jenkins/spark-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ export SPARK_TASK_MAXFAILURES=1
[[ "$IS_SPARK_311_OR_LATER" -eq "0" ]] && SPARK_TASK_MAXFAILURES=4

export PATH="$SPARK_HOME/bin:$SPARK_HOME/sbin:$PATH"

# enable worker cleanup to avoid "out of space" issue
# if failed, we abort the test instantly, so the failed executor log should still be left there for debugging
export SPARK_WORKER_OPTS="$SPARK_WORKER_OPTS -Dspark.worker.cleanup.enabled=true -Dspark.worker.cleanup.interval=120 -Dspark.worker.cleanup.appDataTtl=60"
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
#stop and restart SPARK ETL
stop-slave.sh
stop-master.sh
Expand All @@ -137,18 +139,12 @@ export BASE_SPARK_SUBMIT_ARGS="$BASE_SPARK_SUBMIT_ARGS \
export SEQ_CONF="--executor-memory 16G \
--total-executor-cores 6"

# currently we hardcode the parallelism and configs based on our CI node's hardware specs,
# we can make it dynamically generated if this script is going to be used in other scenarios in the future
PARALLELISM=${PARALLELISM:-'4'}
MEMORY_FRACTION=$(python -c "print(1/($PARALLELISM + 0.2))")
export PARALLEL_CONF="--executor-memory 4G \
--total-executor-cores 1 \
--conf spark.executor.cores=1 \
--conf spark.task.cpus=1 \
--conf spark.rapids.sql.concurrentGpuTasks=1 \
--conf spark.rapids.memory.gpu.minAllocFraction=0 \
--conf spark.rapids.memory.gpu.allocFraction=${MEMORY_FRACTION} \
--conf spark.rapids.memory.gpu.maxAllocFraction=${MEMORY_FRACTION}"
--conf spark.rapids.memory.gpu.minAllocFraction=0"

export CUDF_UDF_TEST_ARGS="--conf spark.rapids.memory.gpu.allocFraction=0.1 \
--conf spark.rapids.memory.gpu.minAllocFraction=0 \
Expand All @@ -165,7 +161,7 @@ export SCRIPT_PATH="$(pwd -P)"
export TARGET_DIR="$SCRIPT_PATH/target"
mkdir -p $TARGET_DIR

run_test() {
run_test_not_parallel() {
local TEST=${1//\.py/}
local LOG_FILE
case $TEST in
Expand All @@ -190,7 +186,7 @@ run_test() {
LOG_FILE="$TARGET_DIR/$TEST.log"
# set dedicated RUN_DIRs here to avoid conflict between parallel tests
RUN_DIR="$TARGET_DIR/run_dir_$TEST" \
SPARK_SUBMIT_FLAGS="$BASE_SPARK_SUBMIT_ARGS $PARALLEL_CONF" \
SPARK_SUBMIT_FLAGS="$BASE_SPARK_SUBMIT_ARGS $PARALLEL_CONF $MEMORY_FRACTION_CONF" \
./run_pyspark_from_build.sh -k $TEST >"$LOG_FILE" 2>&1

CODE="$?"
Expand All @@ -204,7 +200,7 @@ run_test() {
;;
esac
}
export -f run_test
export -f run_test_not_parallel

# TEST_MODE
# - IT_ONLY
Expand All @@ -214,27 +210,63 @@ TEST_MODE=${TEST_MODE:-'IT_ONLY'}
if [[ $TEST_MODE == "ALL" || $TEST_MODE == "IT_ONLY" ]]; then
# integration tests
if [[ $PARALLEL_TEST == "true" ]] && [ -x "$(command -v parallel)" ]; then
# put most time-consuming tests at the head of queue
time_consuming_tests="hash_aggregate_test.py join_test.py generate_expr_test.py parquet_write_test.py"
tests_list=$(find "$SCRIPT_PATH"/src/main/python/ -name "*_test.py" -printf "%f ")
tests=$(echo "$time_consuming_tests $tests_list" | tr ' ' '\n' | awk '!x[$0]++' | xargs)
# We separate tests/cases into different categories for parallel run to try avoid long tail distribution
# time_consuming_tests: tests that would cost over 1 hour if run sequentially, we split them into cases (time_consuming_tests_cases)
# mem_consuming_cases: cases in time_consuming_tests that would consume much more GPU memory than normal cases
# other_tests: tests except time_consuming_tests_cases and mem_consuming_cases

# TODO: Tag these tests/cases
# time-consuming tests, space-separated
time_consuming_tests="join_test hash_aggregate_test generate_expr_test parquet_write_test orc_test orc_write_test"
# GPU memory-consuming cases in time_consuming_tests, space-separated
mem_consuming_cases="test_hash_reduction_decimal_overflow_sum"
# hardcode parallelism as 2 for gpu-mem consuming cases
export MEMORY_FRACTION_CONF="--conf spark.rapids.memory.gpu.allocFraction=0.45 \
--conf spark.rapids.memory.gpu.maxAllocFraction=0.45"
# --halt "now,fail=1": exit when the first job fail, and kill running jobs.
# we can set it to "never" and print failed ones after finish running all tests if needed
# --group: print stderr after test finished for better readability
parallel --group --halt "now,fail=1" -j"${PARALLELISM}" run_test ::: $tests
parallel --group --halt "now,fail=1" -j2 run_test_not_parallel ::: ${mem_consuming_cases}

time_consuming_tests_str=$(echo ${time_consuming_tests} | xargs | sed 's/ / or /g')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to have a tag for these tests?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would also be good if we could document better some place about these things

Copy link
Collaborator Author

@pxLi pxLi Dec 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to have a tag for these tests?

Yes, tag would be better here. Let me add some +TODO, and I will add some tags here if we see more cases in this category

mem_consuming_cases_str=$(echo ${mem_consuming_cases} | xargs | sed 's/ / and not /g')
time_consuming_tests_cases=$(./run_pyspark_from_build.sh -k \
"(${time_consuming_tests_str}) and not ${mem_consuming_cases_str}" \
--collect-only -qq 2>/dev/null | grep -oP '(?<=::).*?(?=\[)' | uniq | shuf | xargs)
other_tests=$(./run_pyspark_from_build.sh --collect-only -qqq 2>/dev/null | grep -oP '(?<=python/).*?(?=.py)' \
| grep -vP "$(echo ${time_consuming_tests} | xargs | tr ' ' '|')")
tests=$(echo "${time_consuming_tests_cases} ${other_tests}" | tr ' ' '\n' | awk '!x[$0]++' | xargs)

if [[ "${PARALLELISM}" == "" ]]; then
PARALLELISM=$(nvidia-smi --query-gpu=memory.free --format=csv,noheader | \
awk '{if (MAX < $1){ MAX = $1}} END {print int(MAX / (2 * 1024))}')
fi
MEMORY_FRACTION=$(python -c "print(1/($PARALLELISM + 0.1))")
export MEMORY_FRACTION_CONF="--conf spark.rapids.memory.gpu.allocFraction=${MEMORY_FRACTION} \
--conf spark.rapids.memory.gpu.maxAllocFraction=${MEMORY_FRACTION}"
parallel --group --halt "now,fail=1" -j"${PARALLELISM}" run_test_not_parallel ::: $tests
else
run_test all
run_test_not_parallel all
fi

# Temporarily only run on Spark 3.1.1 (https://github.com/NVIDIA/spark-rapids/issues/3311)
if [[ "$IS_SPARK_311_OR_LATER" -eq "1" ]]; then
run_test cache_serializer
if [[ $PARALLEL_TEST == "true" ]] && [ -x "$(command -v parallel)" ]; then
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
cache_test_cases=$(./run_pyspark_from_build.sh -k "cache_test" \
--collect-only -qq 2>/dev/null | grep -oP '(?<=::).*?(?=\[)' | uniq | shuf | xargs)
# hardcode parallelism as 5
export MEMORY_FRACTION_CONF="--conf spark.rapids.memory.gpu.allocFraction=0.18 \
--conf spark.rapids.memory.gpu.maxAllocFraction=0.18 \
--conf spark.sql.cache.serializer=com.nvidia.spark.ParquetCachedBatchSerializer"
parallel --group --halt "now,fail=1" -j5 run_test_not_parallel ::: ${cache_test_cases}
else
run_test_not_parallel cache_serializer
fi
fi
fi

# cudf_udf_test
if [[ "$TEST_MODE" == "ALL" || "$TEST_MODE" == "CUDF_UDF_ONLY" ]]; then
run_test cudf_udf_test
run_test_not_parallel cudf_udf_test
fi

popd
Expand Down