Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Shuffled join OOM with 4GB of GPU memory #10319

Closed
jlowe opened this issue Jan 29, 2024 · 1 comment
Closed

[BUG] Shuffled join OOM with 4GB of GPU memory #10319

jlowe opened this issue Jan 29, 2024 · 1 comment
Assignees
Labels
bug Something isn't working reliability Features to improve reliability or bugs that severly impact the reliability of the plugin

Comments

@jlowe
Copy link
Member

jlowe commented Jan 29, 2024

The following shuffled join experiment throws an OOM when run with a 4GB GPU (i.e.: spark.rapids.memory.gpu.allocSize=4g):

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "1")
spark.conf.set("spark.sql.shuffle.partitions", "4")
val df1 = spark.read.parquet("/tmp/pa")
val df2 = spark.read.parquet("/tmp/pb")
spark.time(df1.join(df2, df1.col("id") === df2.col("id"), "inner").count)

/tmp/pa and /tmp/pb were created like this from a previous session:

spark.range(10).coalesce(1).write.parquet("/tmp/pa")
spark.range(2147483648L).coalesce(1).write.parquet("/tmp/pb")
OOM details
24/01/29 20:43:26 WARN DeviceMemoryEventHandler: Device store exhausted, unable to allocate 1006424960 bytes. Total RMM allocated is 2147084032 bytes.
24/01/29 20:43:26 WARN GpuSemaphore: Dumping stack traces. The semaphore sees 4 tasks, 2 threads are holding onto the semaphore. 
Semaphore held. Stack trace for task attempt id 472:
    com.nvidia.spark.rapids.DeviceMemoryEventHandler.onAllocFailure(DeviceMemoryEventHandler.scala:143)
    ai.rapids.cudf.Table.partition(Native Method)
    ai.rapids.cudf.Table.partition(Table.java:1995)
    com.nvidia.spark.rapids.GpuHashPartitioningBase$.$anonfun$hashPartitionAndClose$7(GpuHashPartitioningBase.scala:82)
    com.nvidia.spark.rapids.GpuHashPartitioningBase$$$Lambda$5292/734187643.apply(Unknown Source)
    com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
    com.nvidia.spark.rapids.GpuHashPartitioningBase$.$anonfun$hashPartitionAndClose$6(GpuHashPartitioningBase.scala:81)
    com.nvidia.spark.rapids.GpuHashPartitioningBase$$$Lambda$5291/620426084.apply(Unknown Source)
    com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
    com.nvidia.spark.rapids.GpuHashPartitioningBase$.$anonfun$hashPartitionAndClose$2(GpuHashPartitioningBase.scala:80)
    com.nvidia.spark.rapids.GpuHashPartitioningBase$$$Lambda$5282/767905282.apply(Unknown Source)
    com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
    com.nvidia.spark.rapids.GpuHashPartitioningBase$.$anonfun$hashPartitionAndClose$1(GpuHashPartitioningBase.scala:72)
    com.nvidia.spark.rapids.GpuHashPartitioningBase$$$Lambda$5281/817548700.apply(Unknown Source)
    com.nvidia.spark.rapids.RmmRapidsRetryIterator$AutoCloseableAttemptSpliterator.next(RmmRapidsRetryIterator.scala:477)
    com.nvidia.spark.rapids.RmmRapidsRetryIterator$RmmRapidsRetryIterator.next(RmmRapidsRetryIterator.scala:613)
    com.nvidia.spark.rapids.RmmRapidsRetryIterator$RmmRapidsRetryAutoCloseableIterator.next(RmmRapidsRetryIterator.scala:517)
    com.nvidia.spark.rapids.RmmRapidsRetryIterator$.drainSingleWithVerification(RmmRapidsRetryIterator.scala:291)
    com.nvidia.spark.rapids.RmmRapidsRetryIterator$.withRetryNoSplit(RmmRapidsRetryIterator.scala:132)
    com.nvidia.spark.rapids.GpuHashPartitioningBase$.hashPartitionAndClose(GpuHashPartitioningBase.scala:71)
    org.apache.spark.sql.rapids.execution.GpuBatchSubPartitioner.partitionBatches(GpuSubPartitionHashJoin.scala:181)
    org.apache.spark.sql.rapids.execution.GpuBatchSubPartitioner.initPartitions(GpuSubPartitionHashJoin.scala:156)
    org.apache.spark.sql.rapids.execution.GpuBatchSubPartitioner.batchesCount(GpuSubPartitionHashJoin.scala:106)
    org.apache.spark.sql.rapids.execution.GpuSubPartitionPairIterator.$anonfun$hasNextBatch$1(GpuSubPartitionHashJoin.scala:397)
    org.apache.spark.sql.rapids.execution.GpuSubPartitionPairIterator$$Lambda$5455/1446338990.apply$mcZ$sp(Unknown Source)
    org.apache.spark.sql.rapids.execution.GpuSubPartitionPairIterator.tryPullNextPair(GpuSubPartitionHashJoin.scala:406)
    org.apache.spark.sql.rapids.execution.GpuSubPartitionPairIterator.hasNext(GpuSubPartitionHashJoin.scala:365)
    org.apache.spark.sql.rapids.execution.BaseSubHashJoinIterator.$anonfun$hasNext$5(GpuSubPartitionHashJoin.scala:509)
    org.apache.spark.sql.rapids.execution.BaseSubHashJoinIterator$$Lambda$5457/748018050.apply$mcZ$sp(Unknown Source)
    scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
    com.nvidia.spark.rapids.GpuMetric.ns(GpuExec.scala:150)
    org.apache.spark.sql.rapids.execution.BaseSubHashJoinIterator.hasNext(GpuSubPartitionHashJoin.scala:509)
    scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    com.nvidia.spark.rapids.AbstractProjectSplitIterator.hasNext(basicPhysicalOperators.scala:233)
    scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
    scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    com.nvidia.spark.rapids.GpuMergeAggregateIterator.$anonfun$next$2(GpuAggregateExec.scala:751)
    com.nvidia.spark.rapids.GpuMergeAggregateIterator$$Lambda$4817/20299667.apply(Unknown Source)
    scala.Option.getOrElse(Option.scala:189)
    com.nvidia.spark.rapids.GpuMergeAggregateIterator.next(GpuAggregateExec.scala:749)
    com.nvidia.spark.rapids.GpuMergeAggregateIterator.next(GpuAggregateExec.scala:711)
    scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    com.nvidia.spark.rapids.DynamicGpuPartialSortAggregateIterator.$anonfun$next$6(GpuAggregateExec.scala:2042)
    com.nvidia.spark.rapids.DynamicGpuPartialSortAggregateIterator$$Lambda$4815/72242831.apply(Unknown Source)
    scala.Option.map(Option.scala:230)
    com.nvidia.spark.rapids.DynamicGpuPartialSortAggregateIterator.next(GpuAggregateExec.scala:2042)
    com.nvidia.spark.rapids.DynamicGpuPartialSortAggregateIterator.next(GpuAggregateExec.scala:1906)
    org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase$$anon$1.partNextBatch(GpuShuffleExchangeExecBase.scala:333)
    org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase$$anon$1.hasNext(GpuShuffleExchangeExecBase.scala:355)
    org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
    org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    org.apache.spark.scheduler.Task.run(Task.scala:131)
    org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
    org.apache.spark.executor.Executor$TaskRunner$$Lambda$2745/1333587198.apply(Unknown Source)
    org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
    org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    java.lang.Thread.run(Thread.java:750)

Semaphore held. Stack trace for task attempt id 474:
    java.lang.Thread.getStackTrace(Thread.java:1564)
    com.nvidia.spark.rapids.GpuSemaphore.$anonfun$dumpActiveStackTracesToLog$2(GpuSemaphore.scala:316)
    com.nvidia.spark.rapids.GpuSemaphore.$anonfun$dumpActiveStackTracesToLog$2$adapted(GpuSemaphore.scala:314)
    scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    com.nvidia.spark.rapids.GpuSemaphore.$anonfun$dumpActiveStackTracesToLog$1(GpuSemaphore.scala:314)
    com.nvidia.spark.rapids.GpuSemaphore.$anonfun$dumpActiveStackTracesToLog$1$adapted(GpuSemaphore.scala:312)
    java.util.concurrent.ConcurrentHashMap.forEach(ConcurrentHashMap.java:1597)
    com.nvidia.spark.rapids.GpuSemaphore.dumpActiveStackTracesToLog(GpuSemaphore.scala:312)
    com.nvidia.spark.rapids.GpuSemaphore$.dumpActiveStackTracesToLog(GpuSemaphore.scala:89)
    com.nvidia.spark.rapids.DeviceMemoryEventHandler.onAllocFailure(DeviceMemoryEventHandler.scala:145)
    ai.rapids.cudf.Rmm.allocInternal(Native Method)
    ai.rapids.cudf.Rmm.alloc(Rmm.java:506)
    ai.rapids.cudf.DeviceMemoryBuffer.allocate(DeviceMemoryBuffer.java:147)
    ai.rapids.cudf.DeviceMemoryBuffer.allocate(DeviceMemoryBuffer.java:137)
    com.nvidia.spark.rapids.RapidsBufferStore$RapidsBufferBase.$anonfun$getDeviceMemoryBuffer$6(RapidsBufferStore.scala:515)
    com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
    com.nvidia.spark.rapids.RapidsBufferStore$RapidsBufferBase.getDeviceMemoryBuffer(RapidsBufferStore.scala:514)
    com.nvidia.spark.rapids.RapidsBufferStore$RapidsBufferBase.getColumnarBatch(RapidsBufferStore.scala:452)
    com.nvidia.spark.rapids.SpillableColumnarBatchImpl.$anonfun$getColumnarBatch$1(SpillableColumnarBatch.scala:112)
    com.nvidia.spark.rapids.SpillableColumnarBatchImpl.$anonfun$withRapidsBuffer$1(SpillableColumnarBatch.scala:95)
    com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
    com.nvidia.spark.rapids.SpillableColumnarBatchImpl.withRapidsBuffer(SpillableColumnarBatch.scala:94)
    com.nvidia.spark.rapids.SpillableColumnarBatchImpl.getColumnarBatch(SpillableColumnarBatch.scala:110)
    com.nvidia.spark.rapids.GpuShuffledHashJoinExec$.$anonfun$getFilteredBuildBatches$6(GpuShuffledHashJoinExec.scala:376)
    com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
    com.nvidia.spark.rapids.GpuShuffledHashJoinExec$.$anonfun$getFilteredBuildBatches$5(GpuShuffledHashJoinExec.scala:376)
    scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    scala.collection.Iterator$ConcatIterator.next(Iterator.scala:232)
    com.nvidia.spark.rapids.CollectTimeIterator.$anonfun$next$1(GpuExec.scala:192)
    com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
    com.nvidia.spark.rapids.CollectTimeIterator.next(GpuExec.scala:191)
    scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    org.apache.spark.sql.rapids.execution.GpuBatchSubPartitioner.partitionBatches(GpuSubPartitionHashJoin.scala:176)
    org.apache.spark.sql.rapids.execution.GpuBatchSubPartitioner.initPartitions(GpuSubPartitionHashJoin.scala:156)
    org.apache.spark.sql.rapids.execution.GpuBatchSubPartitioner.batchesCount(GpuSubPartitionHashJoin.scala:106)
    org.apache.spark.sql.rapids.execution.GpuSubPartitionPairIterator.$anonfun$hasNextBatch$1(GpuSubPartitionHashJoin.scala:397)
    org.apache.spark.sql.rapids.execution.GpuSubPartitionPairIterator.tryPullNextPair(GpuSubPartitionHashJoin.scala:406)
    org.apache.spark.sql.rapids.execution.GpuSubPartitionPairIterator.hasNext(GpuSubPartitionHashJoin.scala:365)
    org.apache.spark.sql.rapids.execution.BaseSubHashJoinIterator.$anonfun$hasNext$5(GpuSubPartitionHashJoin.scala:509)
    scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
    com.nvidia.spark.rapids.GpuMetric.ns(GpuExec.scala:150)
    org.apache.spark.sql.rapids.execution.BaseSubHashJoinIterator.hasNext(GpuSubPartitionHashJoin.scala:509)
    scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    com.nvidia.spark.rapids.AbstractProjectSplitIterator.hasNext(basicPhysicalOperators.scala:233)
    scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
    scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    com.nvidia.spark.rapids.GpuMergeAggregateIterator.$anonfun$next$2(GpuAggregateExec.scala:751)
    scala.Option.getOrElse(Option.scala:189)
    com.nvidia.spark.rapids.GpuMergeAggregateIterator.next(GpuAggregateExec.scala:749)
    com.nvidia.spark.rapids.GpuMergeAggregateIterator.next(GpuAggregateExec.scala:711)
    scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    com.nvidia.spark.rapids.DynamicGpuPartialSortAggregateIterator.$anonfun$next$6(GpuAggregateExec.scala:2042)
    scala.Option.map(Option.scala:230)
    com.nvidia.spark.rapids.DynamicGpuPartialSortAggregateIterator.next(GpuAggregateExec.scala:2042)
    com.nvidia.spark.rapids.DynamicGpuPartialSortAggregateIterator.next(GpuAggregateExec.scala:1906)
    org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase$$anon$1.partNextBatch(GpuShuffleExchangeExecBase.scala:333)
    org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase$$anon$1.hasNext(GpuShuffleExchangeExecBase.scala:355)
    org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
    org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    org.apache.spark.scheduler.Task.run(Task.scala:131)
    org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
    org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
    org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    java.lang.Thread.run(Thread.java:750)




24/01/29 20:43:26 WARN DeviceMemoryEventHandler: [RETRY 1] Retrying allocation of 1072305792 after a synchronize. Total RMM allocated is 2147084032 bytes.
24/01/29 20:43:26 WARN DeviceMemoryEventHandler: [RETRY 2] Retrying allocation of 1072305792 after a synchronize. Total RMM allocated is 2147084032 bytes.
24/01/29 20:43:26 WARN DeviceMemoryEventHandler: Device store exhausted, unable to allocate 1072305792 bytes. Total RMM allocated is 2147084032 bytes.
24/01/29 20:43:26 WARN DeviceMemoryEventHandler: [RETRY 1] Retrying allocation of 1006424960 after a synchronize. Total RMM allocated is 2147084032 bytes.
24/01/29 20:43:26 WARN DeviceMemoryEventHandler: [RETRY 2] Retrying allocation of 1006424960 after a synchronize. Total RMM allocated is 2147084032 bytes.
24/01/29 20:43:26 WARN DeviceMemoryEventHandler: Device store exhausted, unable to allocate 1006424960 bytes. Total RMM allocated is 2147084032 bytes.
24/01/29 20:43:26 WARN DeviceMemoryEventHandler: [RETRY 1] Retrying allocation of 1073726848 after a synchronize. Total RMM allocated is 2147084032 bytes.
24/01/29 20:43:26 WARN DeviceMemoryEventHandler: [RETRY 2] Retrying allocation of 1073726848 after a synchronize. Total RMM allocated is 2147084032 bytes.
24/01/29 20:43:26 WARN DeviceMemoryEventHandler: Device store exhausted, unable to allocate 1073726848 bytes. Total RMM allocated is 2147084032 bytes.
24/01/29 20:43:26 ERROR Executor: Exception in task 3.0 in stage 23.0 (TID 474)
com.nvidia.spark.rapids.jni.GpuRetryOOM: GPU OutOfMemory
	at ai.rapids.cudf.Rmm.allocInternal(Native Method)
	at ai.rapids.cudf.Rmm.alloc(Rmm.java:506)
	at ai.rapids.cudf.DeviceMemoryBuffer.allocate(DeviceMemoryBuffer.java:147)
	at ai.rapids.cudf.DeviceMemoryBuffer.allocate(DeviceMemoryBuffer.java:137)
	at com.nvidia.spark.rapids.RapidsBufferStore$RapidsBufferBase.$anonfun$getDeviceMemoryBuffer$6(RapidsBufferStore.scala:515)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
	at com.nvidia.spark.rapids.RapidsBufferStore$RapidsBufferBase.getDeviceMemoryBuffer(RapidsBufferStore.scala:514)
	at com.nvidia.spark.rapids.RapidsBufferStore$RapidsBufferBase.getColumnarBatch(RapidsBufferStore.scala:452)
	at com.nvidia.spark.rapids.SpillableColumnarBatchImpl.$anonfun$getColumnarBatch$1(SpillableColumnarBatch.scala:112)
	at com.nvidia.spark.rapids.SpillableColumnarBatchImpl.$anonfun$withRapidsBuffer$1(SpillableColumnarBatch.scala:95)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
	at com.nvidia.spark.rapids.SpillableColumnarBatchImpl.withRapidsBuffer(SpillableColumnarBatch.scala:94)
	at com.nvidia.spark.rapids.SpillableColumnarBatchImpl.getColumnarBatch(SpillableColumnarBatch.scala:110)
	at com.nvidia.spark.rapids.GpuShuffledHashJoinExec$.$anonfun$getFilteredBuildBatches$6(GpuShuffledHashJoinExec.scala:376)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
	at com.nvidia.spark.rapids.GpuShuffledHashJoinExec$.$anonfun$getFilteredBuildBatches$5(GpuShuffledHashJoinExec.scala:376)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$ConcatIterator.next(Iterator.scala:232)
	at com.nvidia.spark.rapids.CollectTimeIterator.$anonfun$next$1(GpuExec.scala:192)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
	at com.nvidia.spark.rapids.CollectTimeIterator.next(GpuExec.scala:191)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.sql.rapids.execution.GpuBatchSubPartitioner.partitionBatches(GpuSubPartitionHashJoin.scala:176)
	at org.apache.spark.sql.rapids.execution.GpuBatchSubPartitioner.initPartitions(GpuSubPartitionHashJoin.scala:156)
	at org.apache.spark.sql.rapids.execution.GpuBatchSubPartitioner.batchesCount(GpuSubPartitionHashJoin.scala:106)
	at org.apache.spark.sql.rapids.execution.GpuSubPartitionPairIterator.$anonfun$hasNextBatch$1(GpuSubPartitionHashJoin.scala:397)
	at org.apache.spark.sql.rapids.execution.GpuSubPartitionPairIterator.tryPullNextPair(GpuSubPartitionHashJoin.scala:406)
	at org.apache.spark.sql.rapids.execution.GpuSubPartitionPairIterator.hasNext(GpuSubPartitionHashJoin.scala:365)
	at org.apache.spark.sql.rapids.execution.BaseSubHashJoinIterator.$anonfun$hasNext$5(GpuSubPartitionHashJoin.scala:509)
	at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
	at com.nvidia.spark.rapids.GpuMetric.ns(GpuExec.scala:150)
	at org.apache.spark.sql.rapids.execution.BaseSubHashJoinIterator.hasNext(GpuSubPartitionHashJoin.scala:509)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at com.nvidia.spark.rapids.AbstractProjectSplitIterator.hasNext(basicPhysicalOperators.scala:233)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at com.nvidia.spark.rapids.GpuMergeAggregateIterator.$anonfun$next$2(GpuAggregateExec.scala:751)
	at scala.Option.getOrElse(Option.scala:189)
	at com.nvidia.spark.rapids.GpuMergeAggregateIterator.next(GpuAggregateExec.scala:749)
	at com.nvidia.spark.rapids.GpuMergeAggregateIterator.next(GpuAggregateExec.scala:711)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at com.nvidia.spark.rapids.DynamicGpuPartialSortAggregateIterator.$anonfun$next$6(GpuAggregateExec.scala:2042)
	at scala.Option.map(Option.scala:230)
	at com.nvidia.spark.rapids.DynamicGpuPartialSortAggregateIterator.next(GpuAggregateExec.scala:2042)
	at com.nvidia.spark.rapids.DynamicGpuPartialSortAggregateIterator.next(GpuAggregateExec.scala:1906)
	at org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase$$anon$1.partNextBatch(GpuShuffleExchangeExecBase.scala:333)
	at org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase$$anon$1.hasNext(GpuShuffleExchangeExecBase.scala:355)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

This was with the ARENA allocator.

@jlowe jlowe added bug Something isn't working ? - Needs Triage Need team to review and classify reliability Features to improve reliability or bugs that severly impact the reliability of the plugin labels Jan 29, 2024
@jlowe
Copy link
Member Author

jlowe commented Jan 29, 2024

It looks like we can get an OOM in this code in GpuShuffledHashJoinExec.scala:

        logDebug("Return multiple batches as the build side data for the following " +
          "sub-partitioning join in null-filtering mode.")
        val safeIter = GpuSubPartitionHashJoin.safeIteratorFromSeq(spillBuf.toSeq).map { sp =>
          withResource(sp)(_.getColumnarBatch())

There's no withRetry or withRetryNoSplit here, so if the getColumnarBatch throws an OOM it will directly fail the task rather than pause the task for retries.

@revans2 pointed out that there's iterators returning ColumnarBatch that maybe should be returning SpillableColumnarBatch, as there's often cases where one spillable gets "unspilled" to send down the iterator only to be turned into another spillable for retry processing. We may want to consider having all execs receive spillable batches rather than unspillable ones, since they probably want to only deal with spillable batches when doing proper retry processing. Yes, that's a big change, but it might make finding code that isn't doing retries correctly easier to find.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working reliability Features to improve reliability or bugs that severly impact the reliability of the plugin
Projects
None yet
Development

No branches or pull requests

2 participants