Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Databricks 13.3 BroadcastHashJoin using executor side broadcast fed by ColumnarToRow [Databricks] #10230

Merged
merged 28 commits into from
Jan 22, 2024

Conversation

tgravescs
Copy link
Collaborator

fixes #10165

Here the issue is that Databricks supports a special executor side broadcast in BroadcastHashJoin and in this case the BroadcastHashJoin ended up being on the CPU. The BroadcastHashJoin can't be fed by a ColumnarToRow. It expects to be fed by an exchange that specifically handles going to a single partition.
There is a similar issue in Databricks 12.2 that we fixed by inserting an extra GpuColumnarToRow and then an Exchange. I use similar logic here to fix this and filed a followup to investigate the performance of now having 2 exchanges (#10229).

The beginning plan looks like:

Caused by: org.apache.spark.SparkException: Unexpected build plan for Executor Side Broadcast Join: 
ColumnarToRow
+- AQEShuffleRead ebj
   +- ShuffleQueryStage 132, Statistics(sizeInBytes=7.8 MiB, rowCount=2.74E+5, ColumnStat: N/A, isRuntime=true)
      +- GpuColumnarExchange gpuhashpartitioning(cast(user_id#83458 as int), 200), ENSURE_REQUIREMENTS, [plan_id=394739]
         +- GpuProject [gpucoalesce(user_id#84619, cast(user_id#84639 as string)) AS user_id#83458, if ((gpucoalesce(is_deleted#84640, true) AND NOT (cast(user_id#84619 as int) = 117563))) gdpr-deleted else gpucoalesce(email_address#84631, email_address#84646) AS email_address#83462]
            +- GpuRowToColumnar targetsize(268435456)

After this fix the plan looks like:

 +- Exchange (SinglePartition, EXECUTOR_BROADCAST)
             *     +- GpuColumnarToRow
             *         +- GpuShuffleCoalesce
             *             +- ShuffleQueryStage
             *                 +- GpuColumnarExchange

Added an integration test for databricks. The test throws an exception without the fix and then passes with the fix. This new logic only occurs on Databricks 13.3 and the parameters to AQEShuffleReadExec changed between 12.2 and 13.3 so the shim code went into a specific Databricks 13.3 file.

@tgravescs tgravescs added the bug Something isn't working label Jan 19, 2024
@tgravescs tgravescs self-assigned this Jan 19, 2024
@tgravescs
Copy link
Collaborator Author

Note, not kicking the build because I know the databricks one is currently broken from other issues

@sameerz
Copy link
Collaborator

sameerz commented Jan 22, 2024

build

Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I really would like more eyes on it than just mine.

@tgravescs tgravescs merged commit b3839e4 into NVIDIA:branch-24.02 Jan 22, 2024
41 checks passed
@tgravescs tgravescs deleted the debugExecBroadcast branch January 22, 2024 15:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG] Databricks 13.3 executor side broadcast failure
4 participants