You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Full Stacktrace:
java.lang.IllegalStateException: Expected to only receive a single batch
at com.nvidia.spark.rapids.ConcatAndConsumeAll$.getSingleBatchWithVerification(GpuCoalesceBatches.scala:103)
at org.apache.spark.sql.rapids.execution.GpuExecutorBroadcastHelper$.getExecutorBroadcastBatch(GpuExecutorBroadcastHelper.scala:100)
at org.apache.spark.sql.rapids.execution.GpuBroadcastHashJoinExec.$anonfun$getExecutorBuiltBatchAndStreamIter$1(GpuBroadcastHashJoinExec.scala:151)
at com.nvidia.spark.rapids.Arm$.closeOnExcept(Arm.scala:98)
at org.apache.spark.sql.rapids.execution.GpuBroadcastHashJoinExec.getExecutorBuiltBatchAndStreamIter(GpuBroadcastHashJoinExec.scala:142)
at org.apache.spark.sql.rapids.execution.GpuBroadcastHashJoinExec.$anonfun$doColumnarExecutorBroadcastJoin$1(GpuBroadcastHashJoinExec.scala:181)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:898)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:898)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:410)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:374)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:410)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:374)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:410)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:374)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:410)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:374)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:410)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:374)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:410)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:374)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:410)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:374)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:81)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:81)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:174)
at org.apache.spark.scheduler.Task.$anonfun$run$4(Task.scala:137)
at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:41)
at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:100)
at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:105)
at scala.util.Using$.resource(Using.scala:269)
at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:104)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:137)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:96)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:902)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1697)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:905)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:760)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
The text was updated successfully, but these errors were encountered:
So after some deep dive, I found that this issue will occur after the .join(), where at least one of the Spark Data Frame has duplicates. e.g. df1.join(df2, ...) will throw error above if df1 has duplicated rows, but df1.dropDuplicates().join(df,...) will work.
Thanks for the report @zhangjinge588! I think the issue is that the plugin is hoping for a single batch but not forcing a single batch for the executor broadcast case. It looks like we need to wrap the iterator with a GpuCoalesceIterator that has a goal of RequireSingleBatch.
Curious if you know how big the size of the uncompressed broadcast data is? If it's larger than the setting for spark.rapids.sql.batchSizeBytes (default 1G) then it looks like we can get multiple batches. You could try setting spark.rapids.sql.batchSizeBytes to 2G and see if the behavior changes.
Databricks Spark Cluster
Spark Rapids Configuration:
spark.task.resource.gpu.amount 0.02
spark.rapids.shuffle.multiThreaded.reader.threads 48
spark.executorEnv.PYTHONPATH /databricks/jars/rapids-4-spark_2.12-24.02.0.jar:/databricks/spark/python
spark.shuffle.manager com.nvidia.spark.rapids.spark330db.RapidsShuffleManager
spark.rapids.memory.host.spillStorageSize 16GB
spark.plugins com.nvidia.spark.SQLPlugin
spark.executor.resource.gpu.amount 1
spark.rapids.sql.explain ALL
spark.rapids.filecache.enabled TRUE
spark.rapids.sql.python.gpu.enabled true
spark.rapids.memory.pinnedPool.size 8G
spark.rapids.shuffle.multiThreaded.writer.threads 48
spark.python.daemon.module rapids.daemon_databricks
spark.sql.files.maxPartitionBytes 2GB
spark.rapids.sql.multiThreadedRead.numThreads 100
spark.rapids.sql.concurrentGpuTasks 3
Full Stacktrace:
java.lang.IllegalStateException: Expected to only receive a single batch
at com.nvidia.spark.rapids.ConcatAndConsumeAll$.getSingleBatchWithVerification(GpuCoalesceBatches.scala:103)
at org.apache.spark.sql.rapids.execution.GpuExecutorBroadcastHelper$.getExecutorBroadcastBatch(GpuExecutorBroadcastHelper.scala:100)
at org.apache.spark.sql.rapids.execution.GpuBroadcastHashJoinExec.$anonfun$getExecutorBuiltBatchAndStreamIter$1(GpuBroadcastHashJoinExec.scala:151)
at com.nvidia.spark.rapids.Arm$.closeOnExcept(Arm.scala:98)
at org.apache.spark.sql.rapids.execution.GpuBroadcastHashJoinExec.getExecutorBuiltBatchAndStreamIter(GpuBroadcastHashJoinExec.scala:142)
at org.apache.spark.sql.rapids.execution.GpuBroadcastHashJoinExec.$anonfun$doColumnarExecutorBroadcastJoin$1(GpuBroadcastHashJoinExec.scala:181)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:898)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:898)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:410)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:374)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:410)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:374)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:410)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:374)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:410)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:374)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:410)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:374)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:410)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:374)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:410)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:374)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:81)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:81)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:174)
at org.apache.spark.scheduler.Task.$anonfun$run$4(Task.scala:137)
at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:41)
at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:100)
at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:105)
at scala.util.Using$.resource(Using.scala:269)
at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:104)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:137)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:96)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:902)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1697)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:905)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:760)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
The text was updated successfully, but these errors were encountered: