Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split window exec into multiple stages if needed #2845

Merged
merged 2 commits into from
Jun 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion integration_tests/src/main/python/asserts.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,20 +401,26 @@ def assert_gpu_and_cpu_row_counts_equal(func, conf={}, is_cpu_first=True):
"""
_assert_gpu_and_cpu_are_equal(func, 'COUNT', conf=conf, is_cpu_first=is_cpu_first)

def assert_gpu_and_cpu_are_equal_sql(df_fun, table_name, sql, conf=None, debug=False, is_cpu_first=True):
def assert_gpu_and_cpu_are_equal_sql(df_fun, table_name, sql, conf=None, debug=False, is_cpu_first=True, validate_execs_in_gpu_plan=[]):
"""
Assert that the specified SQL query produces equal results on CPU and GPU.
:param df_fun: a function that will create the dataframe
:param table_name: Name of table to be created with the dataframe
:param sql: SQL query to be run on the specified table
:param conf: Any user-specified confs. Empty by default.
:param debug: Boolean to indicate if the SQL output should be printed
:param is_cpu_first: Boolean to indicate if the CPU should be run first or not
:param validate_execs_in_gpu_plan: String list of expressions to be validated in the GPU plan.
:return: Assertion failure, if results from CPU and GPU do not match.
"""
if conf is None:
conf = {}
def do_it_all(spark):
df = df_fun(spark)
df.createOrReplaceTempView(table_name)
# we hold off on setting the validate execs until after creating the temp view

spark.conf.set('spark.rapids.sql.test.validateExecsInGpuPlan', ','.join(validate_execs_in_gpu_plan))
if debug:
return data_gen.debug_df(spark.sql(sql))
else:
Expand Down
5 changes: 4 additions & 1 deletion integration_tests/src/main/python/window_function_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ def test_window_running_no_part(b_gen, batch_size):
'select ' +
', '.join(query_parts) +
' from window_agg_table ',
validate_execs_in_gpu_plan = ['GpuRunningWindowExec'],
conf = conf)

# This is for aggregations that work with a running window optimization. They don't need to be batched
Expand All @@ -319,6 +320,7 @@ def test_window_running(b_gen, c_gen, batch_size):
'select ' +
', '.join(query_parts) +
' from window_agg_table ',
validate_execs_in_gpu_plan = ['GpuRunningWindowExec'],
conf = conf)

@ignore_order
Expand Down Expand Up @@ -527,6 +529,8 @@ def test_window_aggs_for_rows_collect_list():

# SortExec does not support array type, so sort the result locally.
@ignore_order(local=True)
# This test is more directed at Databricks and their running window optimization instead of ours
# this is why we do not validate that we inserted in a GpuRunningWindowExec, yet.
def test_running_window_function_exec_for_all_aggs():
assert_gpu_and_cpu_are_equal_sql(
lambda spark : gen_df(spark, _gen_data_for_collect_list),
Expand Down Expand Up @@ -554,7 +558,6 @@ def test_running_window_function_exec_for_all_aggs():
from window_collect_table
''')


# Generates some repeated values to test the deduplication of GpuCollectSet.
# And GpuCollectSet does not yet support struct type.
_gen_data_for_collect_set = [
Expand Down
Loading