Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] java.lang.IllegalStateException: Expected to only receive a single batch #10645

Closed
zhangjinge588 opened this issue Mar 28, 2024 · 3 comments · Fixed by #10660
Closed

[BUG] java.lang.IllegalStateException: Expected to only receive a single batch #10645

zhangjinge588 opened this issue Mar 28, 2024 · 3 comments · Fixed by #10660
Assignees
Labels
bug Something isn't working

Comments

@zhangjinge588
Copy link

Databricks Spark Cluster

Spark Rapids Configuration:
spark.task.resource.gpu.amount 0.02
spark.rapids.shuffle.multiThreaded.reader.threads 48
spark.executorEnv.PYTHONPATH /databricks/jars/rapids-4-spark_2.12-24.02.0.jar:/databricks/spark/python
spark.shuffle.manager com.nvidia.spark.rapids.spark330db.RapidsShuffleManager
spark.rapids.memory.host.spillStorageSize 16GB
spark.plugins com.nvidia.spark.SQLPlugin
spark.executor.resource.gpu.amount 1
spark.rapids.sql.explain ALL
spark.rapids.filecache.enabled TRUE
spark.rapids.sql.python.gpu.enabled true
spark.rapids.memory.pinnedPool.size 8G
spark.rapids.shuffle.multiThreaded.writer.threads 48
spark.python.daemon.module rapids.daemon_databricks
spark.sql.files.maxPartitionBytes 2GB
spark.rapids.sql.multiThreadedRead.numThreads 100
spark.rapids.sql.concurrentGpuTasks 3

Full Stacktrace:
java.lang.IllegalStateException: Expected to only receive a single batch
at com.nvidia.spark.rapids.ConcatAndConsumeAll$.getSingleBatchWithVerification(GpuCoalesceBatches.scala:103)
at org.apache.spark.sql.rapids.execution.GpuExecutorBroadcastHelper$.getExecutorBroadcastBatch(GpuExecutorBroadcastHelper.scala:100)
at org.apache.spark.sql.rapids.execution.GpuBroadcastHashJoinExec.$anonfun$getExecutorBuiltBatchAndStreamIter$1(GpuBroadcastHashJoinExec.scala:151)
at com.nvidia.spark.rapids.Arm$.closeOnExcept(Arm.scala:98)
at org.apache.spark.sql.rapids.execution.GpuBroadcastHashJoinExec.getExecutorBuiltBatchAndStreamIter(GpuBroadcastHashJoinExec.scala:142)
at org.apache.spark.sql.rapids.execution.GpuBroadcastHashJoinExec.$anonfun$doColumnarExecutorBroadcastJoin$1(GpuBroadcastHashJoinExec.scala:181)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:898)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:898)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:410)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:374)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:410)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:374)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:410)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:374)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:410)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:374)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:410)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:374)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:410)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:374)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:410)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:374)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:81)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:81)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:174)
at org.apache.spark.scheduler.Task.$anonfun$run$4(Task.scala:137)
at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:41)
at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:100)
at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:105)
at scala.util.Using$.resource(Using.scala:269)
at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:104)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:137)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:96)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:902)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1697)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:905)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:760)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)

@zhangjinge588 zhangjinge588 added ? - Needs Triage Need team to review and classify bug Something isn't working labels Mar 28, 2024
@zhangjinge588
Copy link
Author

So after some deep dive, I found that this issue will occur after the .join(), where at least one of the Spark Data Frame has duplicates. e.g. df1.join(df2, ...) will throw error above if df1 has duplicated rows, but df1.dropDuplicates().join(df,...) will work.

@jlowe
Copy link
Member

jlowe commented Apr 1, 2024

Thanks for the report @zhangjinge588! I think the issue is that the plugin is hoping for a single batch but not forcing a single batch for the executor broadcast case. It looks like we need to wrap the iterator with a GpuCoalesceIterator that has a goal of RequireSingleBatch.

Curious if you know how big the size of the uncompressed broadcast data is? If it's larger than the setting for spark.rapids.sql.batchSizeBytes (default 1G) then it looks like we can get multiple batches. You could try setting spark.rapids.sql.batchSizeBytes to 2G and see if the behavior changes.

@NVnavkumar
Copy link
Collaborator

Closed via #10660

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants