Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trim cache tests to improve integration test time [databricks] #4838

Merged
merged 1 commit into from
Feb 23, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 4 additions & 47 deletions integration_tests/src/main/python/cache_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,75 +59,32 @@ def test_passing_gpuExpr_as_Expr(enable_vectorized_conf):
BooleanGen(), DateGen(), TimestampGen()] + _cache_decimal_gens

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
@pytest.mark.parametrize('enable_vectorized_conf', enable_vectorized_confs, ids=idfn)
@ignore_order
def test_cache_join(data_gen, join_type, enable_vectorized_conf):
def test_cache_join(data_gen, enable_vectorized_conf):
def do_join(spark):
left, right = create_df(spark, data_gen, 500, 500)
cached = left.join(right, left.a == right.r_a, join_type).cache()
cached = left.join(right, left.a == right.r_a, 'Inner').cache()
cached.count() # populates cache
return cached
assert_gpu_and_cpu_are_equal_collect(do_join, conf=enable_vectorized_conf)

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
@pytest.mark.parametrize('enable_vectorized_conf', enable_vectorized_confs, ids=idfn)
# We are OK running everything on CPU until we complete 'https://github.com/NVIDIA/spark-rapids/issues/360'
# because we have an explicit check in our code that disallows InMemoryTableScan to have anything other than
# AttributeReference
@allow_non_gpu(any=True)
@ignore_order
def test_cached_join_filter(data_gen, join_type, enable_vectorized_conf):
def test_cached_join_filter(data_gen, enable_vectorized_conf):
data = data_gen
def do_join(spark):
left, right = create_df(spark, data, 500, 500)
cached = left.join(right, left.a == right.r_a, join_type).cache()
cached = left.join(right, left.a == right.r_a, 'Inner').cache()
cached.count() #populates the cache
return cached.filter("a is not null")
assert_gpu_and_cpu_are_equal_collect(do_join, conf=enable_vectorized_conf)

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('enable_vectorized_conf', enable_vectorized_confs, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
@ignore_order
def test_cache_broadcast_hash_join(data_gen, join_type, enable_vectorized_conf):
def do_join(spark):
left, right = create_df(spark, data_gen, 500, 500)
cached = left.join(right.hint("broadcast"), left.a == right.r_a, join_type).cache()
cached.count()
return cached

assert_gpu_and_cpu_are_equal_collect(do_join, conf=enable_vectorized_conf)

shuffled_conf = {"spark.sql.autoBroadcastJoinThreshold": "160",
"spark.sql.join.preferSortMergeJoin": "false",
"spark.sql.shuffle.partitions": "2"}

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('enable_vectorized_conf', enable_vectorized_confs, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
@ignore_order
def test_cache_shuffled_hash_join(data_gen, join_type, enable_vectorized_conf):
def do_join(spark):
left, right = create_df(spark, data_gen, 50, 500)
cached = left.join(right, left.a == right.r_a, join_type).cache()
cached.count()
return cached
assert_gpu_and_cpu_are_equal_collect(do_join, conf=enable_vectorized_conf)

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('enable_vectorized_conf', enable_vectorized_confs, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
@ignore_order
def test_cache_broadcast_nested_loop_join(data_gen, join_type, enable_vectorized_conf):
def do_join(spark):
left, right = create_df(spark, data_gen, 50, 25)
cached = left.crossJoin(right.hint("broadcast")).cache()
cached.count()
return cached
assert_gpu_and_cpu_are_equal_collect(do_join, conf=enable_vectorized_conf)

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('enable_vectorized_conf', enable_vectorized_confs, ids=idfn)
@ignore_order
Expand Down