Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Join support for DecimalType #1475

Merged
merged 3 commits into from
Jan 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 23 additions & 12 deletions integration_tests/src/main/python/join_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -23,7 +23,9 @@
all_gen = [StringGen(), ByteGen(), ShortGen(), IntegerGen(), LongGen(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its better to use the all_gen in data_gen.py and add the stuff missing

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I really hate all_gen being in data_gen.py because it is not really generating everything (all). I let it through before so I didn't say anything when it was reused, but if you do use it I would want it to be renamed so it is clear what is in it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all_gen in data_gen.py is missing these 2 gens: null_gen, decimal_gen_neg_scale .
Please let me know if below would be acceptable name if we want to rename it for better readability?

  1. all_gen_minus_null_gen_and_decimal_gen_neg_scale OR all_gen_excluding_null_gen_and_decimal_gen_neg_scale (problem I see with this is we may have to rename it again if we add any other gen in the future which is not included in this variable).
    Suggestions are welcome on variable name :) .

Or would you prefer to keep all_gen local to the test files and remove it from data_gen.py ?(I can create follow on PR for that as it involves changing other files).

Copy link
Collaborator

@razajafri razajafri Jan 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't think of a name suitable. @jlowe is better at naming things may be he has something

The right thing to do in my mind would be to add everything in all_gen and let tests create local versions of a list if they need something different. Considering it might break tests if you add everything in all_gen lets create a follow-on for this as a lower priority task

BooleanGen(), DateGen(), TimestampGen(), null_gen,
pytest.param(FloatGen(), marks=[incompat]),
pytest.param(DoubleGen(), marks=[incompat])]
pytest.param(DoubleGen(), marks=[incompat]),
decimal_gen_default, decimal_gen_scale_precision, decimal_gen_same_scale_precision,
decimal_gen_neg_scale, decimal_gen_64bit]

all_gen_no_nulls = [StringGen(nullable=False), ByteGen(nullable=False),
ShortGen(nullable=False), IntegerGen(nullable=False), LongGen(nullable=False),
Expand All @@ -35,9 +37,18 @@

_sortmerge_join_conf = {'spark.sql.autoBroadcastJoinThreshold': '-1',
'spark.sql.join.preferSortMergeJoin': 'True',
'spark.sql.shuffle.partitions': '2'
'spark.sql.shuffle.partitions': '2',
'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true'
}

_cartesean_join_conf = {'spark.rapids.sql.exec.CartesianProductExec': 'true',
'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true'
}

_broadcastnestedloop_join_conf = {'spark.rapids.sql.exec.BroadcastNestedLoopJoinExec': 'true',
'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true'
}

def create_df(spark, data_gen, left_length, right_length):
left = binary_op_df(spark, data_gen, length=left_length)
right = binary_op_df(spark, data_gen, length=right_length).withColumnRenamed("a", "r_a")\
Expand Down Expand Up @@ -67,7 +78,7 @@ def test_broadcast_join_right_table(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 500, 250)
return left.join(broadcast(right), left.a == right.r_a, join_type)
assert_gpu_and_cpu_are_equal_collect(do_join)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
Expand All @@ -77,7 +88,7 @@ def test_cartesean_join(data_gen):
def do_join(spark):
left, right = create_df(spark, data_gen, 50, 25)
return left.crossJoin(right)
assert_gpu_and_cpu_are_equal_collect(do_join, conf={'spark.rapids.sql.exec.CartesianProductExec': 'true'})
assert_gpu_and_cpu_are_equal_collect(do_join, conf=_cartesean_join_conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
Expand All @@ -89,7 +100,7 @@ def test_cartesean_join_special_case(data_gen):
def do_join(spark):
left, right = create_df(spark, data_gen, 500, 250)
return left.crossJoin(right).selectExpr('COUNT(*)')
assert_gpu_and_cpu_are_equal_collect(do_join, conf={'spark.rapids.sql.exec.CartesianProductExec': 'true'})
assert_gpu_and_cpu_are_equal_collect(do_join, conf=_cartesean_join_conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
Expand All @@ -99,7 +110,7 @@ def test_broadcast_nested_loop_join(data_gen):
def do_join(spark):
left, right = create_df(spark, data_gen, 50, 25)
return left.crossJoin(broadcast(right))
assert_gpu_and_cpu_are_equal_collect(do_join, conf={'spark.rapids.sql.exec.BroadcastNestedLoopJoinExec': 'true'})
assert_gpu_and_cpu_are_equal_collect(do_join, conf=_broadcastnestedloop_join_conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
Expand All @@ -109,7 +120,7 @@ def test_broadcast_nested_loop_join_special_case(data_gen):
def do_join(spark):
left, right = create_df(spark, data_gen, 50, 25)
return left.crossJoin(broadcast(right)).selectExpr('COUNT(*)')
assert_gpu_and_cpu_are_equal_collect(do_join, conf={'spark.rapids.sql.exec.BroadcastNestedLoopJoinExec': 'true'})
assert_gpu_and_cpu_are_equal_collect(do_join, conf=_broadcastnestedloop_join_conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
Expand All @@ -125,7 +136,7 @@ def do_join(spark):
# that do not expose the error
return left.join(broadcast(right),
(left.b >= right.r_b), join_type)
assert_gpu_and_cpu_are_equal_collect(do_join, conf={'spark.rapids.sql.exec.BroadcastNestedLoopJoinExec': 'true'})
assert_gpu_and_cpu_are_equal_collect(do_join, conf=_broadcastnestedloop_join_conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
Expand All @@ -138,7 +149,7 @@ def test_broadcast_join_left_table(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 250, 500)
return broadcast(left).join(right, left.a == right.r_a, join_type)
assert_gpu_and_cpu_are_equal_collect(do_join)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
Expand All @@ -150,7 +161,7 @@ def do_join(spark):
left, right = create_df(spark, data_gen, 500, 250)
return left.join(broadcast(right),
(left.a == right.r_a) & (left.b >= right.r_b), join_type)
assert_gpu_and_cpu_are_equal_collect(do_join)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf)


_mixed_df1_with_nulls = [('a', RepeatSeqGen(LongGen(nullable=(True, 20.0)), length= 10)),
Expand All @@ -170,7 +181,7 @@ def do_join(spark):
right = gen_df(spark, _mixed_df2_with_nulls, length=500).withColumnRenamed("a", "r_a")\
.withColumnRenamed("b", "r_b").withColumnRenamed("c", "r_c")
return left.join(broadcast(right), left.a.eqNullSafe(right.r_a), join_type)
assert_gpu_and_cpu_are_equal_collect(do_join)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf)

@ignore_order
@allow_non_gpu('DataWritingCommandExec')
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -179,15 +179,15 @@ class Spark300Shims extends SparkShims {
}),
GpuOverrides.exec[SortMergeJoinExec](
"Sort merge join, replacing with shuffled hash join",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all),
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
(join, conf, p, r) => new GpuSortMergeJoinMeta(join, conf, p, r)),
GpuOverrides.exec[BroadcastHashJoinExec](
"Implementation of join using broadcast data",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all),
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
(join, conf, p, r) => new GpuBroadcastHashJoinMeta(join, conf, p, r)),
GpuOverrides.exec[ShuffledHashJoinExec](
"Implementation of join using hashed shuffled data",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all),
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
(join, conf, p, r) => new GpuShuffledHashJoinMeta(join, conf, p, r))
).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -42,15 +42,15 @@ class Spark301Shims extends Spark300Shims {
super.getExecs ++ Seq(
GpuOverrides.exec[SortMergeJoinExec](
"Sort merge join, replacing with shuffled hash join",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all),
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
(join, conf, p, r) => new GpuSortMergeJoinMeta(join, conf, p, r)),
GpuOverrides.exec[BroadcastHashJoinExec](
"Implementation of join using broadcast data",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all),
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
(join, conf, p, r) => new GpuBroadcastHashJoinMeta(join, conf, p, r)),
GpuOverrides.exec[ShuffledHashJoinExec](
"Implementation of join using hashed shuffled data",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all),
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
(join, conf, p, r) => new GpuShuffledHashJoinMeta(join, conf, p, r))
).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -139,15 +139,15 @@ class Spark301dbShims extends Spark301Shims {
}),
GpuOverrides.exec[SortMergeJoinExec](
"Sort merge join, replacing with shuffled hash join",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all),
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
(join, conf, p, r) => new GpuSortMergeJoinMeta(join, conf, p, r)),
GpuOverrides.exec[BroadcastHashJoinExec](
"Implementation of join using broadcast data",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all),
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
(join, conf, p, r) => new GpuBroadcastHashJoinMeta(join, conf, p, r)),
GpuOverrides.exec[ShuffledHashJoinExec](
"Implementation of join using hashed shuffled data",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all),
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
(join, conf, p, r) => new GpuShuffledHashJoinMeta(join, conf, p, r))
).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -249,15 +249,15 @@ class Spark310Shims extends Spark301Shims {
}),
GpuOverrides.exec[SortMergeJoinExec](
"Sort merge join, replacing with shuffled hash join",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all),
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
(join, conf, p, r) => new GpuSortMergeJoinMeta(join, conf, p, r)),
GpuOverrides.exec[BroadcastHashJoinExec](
"Implementation of join using broadcast data",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all),
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
(join, conf, p, r) => new GpuBroadcastHashJoinMeta(join, conf, p, r)),
GpuOverrides.exec[ShuffledHashJoinExec](
"Implementation of join using hashed shuffled data",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all),
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
(join, conf, p, r) => new GpuShuffledHashJoinMeta(join, conf, p, r))
).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2312,16 +2312,16 @@ object GpuOverrides {
}),
exec[BroadcastExchangeExec](
"The backend for broadcast exchange of data",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all),
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
(exchange, conf, p, r) => new GpuBroadcastMeta(exchange, conf, p, r)),
exec[BroadcastNestedLoopJoinExec](
"Implementation of join using brute force",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all),
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
(join, conf, p, r) => new GpuBroadcastNestedLoopJoinMeta(join, conf, p, r))
.disabledByDefault("large joins can cause out of memory errors"),
exec[CartesianProductExec](
"Implementation of join using brute force",
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL, TypeSig.all),
ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
(join, conf, p, r) => new SparkPlanMeta[CartesianProductExec](join, conf, p, r) {
val condition: Option[BaseExprMeta[_]] =
join.condition.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
Expand Down