Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Hash Partitioning can fail for very small batches #2279

Closed
revans2 opened this issue Apr 28, 2021 · 3 comments · Fixed by #2319
Closed

[BUG] Hash Partitioning can fail for very small batches #2279

revans2 opened this issue Apr 28, 2021 · 3 comments · Fixed by #2319
Assignees
Labels
bug Something isn't working cudf_dependency An issue or PR with this label depends on a new feature in cudf P0 Must have for release

Comments

@revans2
Copy link
Collaborator

revans2 commented Apr 28, 2021

I am not 100% sure what the issue is, but we can get an invalid partition range for hash partitioning in some cases with the following patch.

This also triggers #2278 so only look at

FAILED ../../src/main/python/join_test.py::test_sortmerge_join_array[100-Left-Array(String)][IGNORE_ORDER({'local': True})]
FAILED ../../src/main/python/join_test.py::test_sortmerge_join_array[100-Right-Array(String)][IGNORE_ORDER({'local': True})]
FAILED ../../src/main/python/join_test.py::test_sortmerge_join_array[100-Inner-Array(String)][IGNORE_ORDER({'local': True})]
FAILED ../../src/main/python/join_test.py::test_sortmerge_join_array[100-LeftSemi-Array(String)][IGNORE_ORDER({'local': True})]
FAILED ../../src/main/python/join_test.py::test_sortmerge_join_array[100-Cross-Array(String)][IGNORE_ORDER({'local': True})]

diff --git a/integration_tests/src/main/python/join_test.py b/integration_tests/src/main/python/join_test.py
index 745ba40dd..ee4e75dac 100644
--- a/integration_tests/src/main/python/join_test.py
+++ b/integration_tests/src/main/python/join_test.py
@@ -81,20 +81,26 @@ def create_nested_df(spark, key_data_gen, data_gen, left_length, right_length):
 @ignore_order(local=True)
 @pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
 @pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti', 'Cross', 'FullOuter'], ids=idfn)
-def test_sortmerge_join(data_gen, join_type):
+@pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) # set the batch size so we can test out of core joins too
+def test_sortmerge_join(data_gen, join_type, batch_size):
     def do_join(spark):
         left, right = create_df(spark, data_gen, 500, 500)
         return left.join(right, left.a == right.r_a, join_type)
-    assert_gpu_and_cpu_are_equal_collect(do_join, conf=_sortmerge_join_conf)
+    conf = {'spark.rapids.sql.batchSizeBytes': batch_size}
+    conf.update(_sortmerge_join_conf)
+    assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf)
 
 @ignore_order(local=True)
 @pytest.mark.parametrize('data_gen', single_level_array_gens_no_decimal, ids=idfn)
 @pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti', 'Cross', 'FullOuter'], ids=idfn)
-def test_sortmerge_join_array(data_gen, join_type):
+@pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) # set the batch size so we can test out of core joins too
+def test_sortmerge_join_array(data_gen, join_type, batch_size):
     def do_join(spark):
         left, right = create_nested_df(spark, short_gen, data_gen, 500, 500)
         return left.join(right, left.key == right.r_key, join_type)
-    assert_gpu_and_cpu_are_equal_collect(do_join, conf=_sortmerge_join_conf)
+    conf = {'spark.rapids.sql.batchSizeBytes': batch_size}
+    conf.update(_sortmerge_join_conf)
+    assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf)
 
 @allow_non_gpu('SortMergeJoinExec', 'SortExec', 'KnownFloatingPointNormalized', 'ArrayTransform', 'LambdaFunction', 'NamedLambdaVariable', 'NormalizeNaNAndZero')
 @ignore_order(local=True)
@@ -109,11 +115,14 @@ def test_sortmerge_join_array_as_key(data_gen, join_type):
 @ignore_order(local=True)
 @pytest.mark.parametrize('data_gen', [all_basic_struct_gen], ids=idfn)
 @pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti', 'Cross', 'FullOuter'], ids=idfn)
-def test_sortmerge_join_struct(data_gen, join_type):
+@pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) # set the batch size so we can test out of core joins too
+def test_sortmerge_join_struct(data_gen, join_type, batch_size):
     def do_join(spark):
         left, right = create_nested_df(spark, short_gen, data_gen, 500, 500)
         return left.join(right, left.key == right.r_key, join_type)
-    assert_gpu_and_cpu_are_equal_collect(do_join, conf=_sortmerge_join_conf)
+    conf = {'spark.rapids.sql.batchSizeBytes': batch_size}
+    conf.update(_sortmerge_join_conf)
+    assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf)
 
 @allow_non_gpu('SortMergeJoinExec', 'SortExec', 'KnownFloatingPointNormalized', 'NormalizeNaNAndZero', 'CreateNamedStruct', 'GetStructField', 'Literal', 'If', 'IsNull')
 @ignore_order(local=True)

The exception I get back is

21/04/28 13:24:55 ERROR Executor: Exception in task 0.0 in stage 1743.0 (TID 2316)
ai.rapids.cudf.CudfException: cuDF failure at: ../include/cudf/strings/detail/utilities.cuh:52: Invalid iterator range
        at ai.rapids.cudf.Table.partition(Native Method)
        at ai.rapids.cudf.Table.partition(Table.java:1294)
        at com.nvidia.spark.rapids.GpuHashPartitioning.$anonfun$partitionInternalAndClose$6(GpuHashPartitioning.scala:61)
        at com.nvidia.spark.rapids.Arm.withResource(Arm.scala:28)
        at com.nvidia.spark.rapids.Arm.withResource$(Arm.scala:26)
        at com.nvidia.spark.rapids.GpuHashPartitioning.withResource(GpuHashPartitioning.scala:28)
        at com.nvidia.spark.rapids.GpuHashPartitioning.$anonfun$partitionInternalAndClose$5(GpuHashPartitioning.scala:60)
        at com.nvidia.spark.rapids.Arm.withResource(Arm.scala:28)
        at com.nvidia.spark.rapids.Arm.withResource$(Arm.scala:26)
        at com.nvidia.spark.rapids.GpuHashPartitioning.withResource(GpuHashPartitioning.scala:28)
        at com.nvidia.spark.rapids.GpuHashPartitioning.$anonfun$partitionInternalAndClose$1(GpuHashPartitioning.scala:59)
        at com.nvidia.spark.rapids.Arm.withResource(Arm.scala:28)
        at com.nvidia.spark.rapids.Arm.withResource$(Arm.scala:26)
        at com.nvidia.spark.rapids.GpuHashPartitioning.withResource(GpuHashPartitioning.scala:28)
        at com.nvidia.spark.rapids.GpuHashPartitioning.partitionInternalAndClose(GpuHashPartitioning.scala:51)
        at com.nvidia.spark.rapids.GpuHashPartitioning.$anonfun$columnarEval$2(GpuHashPartitioning.scala:82)
        at com.nvidia.spark.rapids.Arm.withResource(Arm.scala:28)
        at com.nvidia.spark.rapids.Arm.withResource$(Arm.scala:26)
        at com.nvidia.spark.rapids.GpuHashPartitioning.withResource(GpuHashPartitioning.scala:28)
        at com.nvidia.spark.rapids.GpuHashPartitioning.$anonfun$columnarEval$1(GpuHashPartitioning.scala:81)
        at com.nvidia.spark.rapids.Arm.withResource(Arm.scala:28)
        at com.nvidia.spark.rapids.Arm.withResource$(Arm.scala:26)
        at com.nvidia.spark.rapids.GpuHashPartitioning.withResource(GpuHashPartitioning.scala:28)
        at com.nvidia.spark.rapids.GpuHashPartitioning.columnarEval(GpuHashPartitioning.scala:78)
        at org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExec$.$anonfun$prepareBatchShuffleDependency$3(GpuShuffleExchangeExec.scala:203)
        at org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExec$$anon$1.partNextBatch(GpuShuffleExchangeExec.scala:224)
        at org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExec$$anon$1.hasNext(GpuShuffleExchangeExec.scala:235)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:155)
        at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

I'm not sure if it is an issue with cudf or what.

@revans2 revans2 added bug Something isn't working P0 Must have for release labels Apr 28, 2021
@revans2 revans2 self-assigned this Apr 28, 2021
@revans2
Copy link
Collaborator Author

revans2 commented Apr 28, 2021

This looks like it is an issue in cudf. I have been able to reproduce it in a very simple case.

val listOfStringType = new HostColumnVector.ListType(true, new HostColumnVector.BasicType(true, DType.STRING))
val v = ColumnVector.fromLists(listOfStringType, Arrays.asList(), null)
val t = new Table(v)
val parts = ColumnVector.fromInts(0, 0)
val tmp = t.partition(parts, 2)
ai.rapids.cudf.CudfException: cuDF failure at: ../include/cudf/strings/detail/utilities.cuh:52: Invalid iterator range
  at ai.rapids.cudf.Table.partition(Native Method)
  at ai.rapids.cudf.Table.partition(Table.java:1294)
  ... 49 elided

It does not happen with just a gather, so it appears to be related with how partition is working. I'll try to come up with a C++ repro case and file something in CUDF.

I am not sure if this needs to stop the 0.5 release. It appears to only be an issue if what we are partitioning is a list of something nested like strings or other lists and it is made up of all nulls or all empty lists (which makes one of the leaf data columns 0 length)

@revans2 revans2 added the cudf_dependency An issue or PR with this label depends on a new feature in cudf label Apr 28, 2021
@revans2
Copy link
Collaborator Author

revans2 commented Apr 28, 2021

I saw able to repro it in cudf and filed an issue there for it. rapidsai/cudf#8098

@sameerz
Copy link
Collaborator

sameerz commented May 3, 2021

Disabled hash partitioning on arrays in PR #2319 to close this issue, filed an issue to re-enable hash partitioning with arrays once the dependent cudf issue/PR are closed.

@sameerz sameerz closed this as completed May 3, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working cudf_dependency An issue or PR with this label depends on a new feature in cudf P0 Must have for release
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants