Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve warnings about AQE nodes not supported on GPU #647

Merged
merged 2 commits into from
Sep 3, 2020
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.CustomShuffleReaderExec
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, BroadcastQueryStageExec, CustomShuffleReaderExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.execution.command.{DataWritingCommand, DataWritingCommandExec}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoHadoopFsRelationCommand}
Expand Down Expand Up @@ -1735,6 +1735,24 @@ object GpuOverrides {
GpuCustomShuffleReaderExec(childPlans.head.convertIfNeeded(),
exec.partitionSpecs)
}
}),
exec[AdaptiveSparkPlanExec]("Adaptive query", (exec, conf, p, r) =>
new SparkPlanMeta[AdaptiveSparkPlanExec](exec, conf, p, r) {
override def tagPlanForGpu(): Unit =
willNotWorkOnGpu("this is an adaptive plan")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to be a bit more descriptive here? I can see users wondering why it says, this is an adaptive plan yet the docs say the plugin support AQE. "If it supports AQE, why isn't it handling adaptive plans? Isn't that the whole point of the plugin supporting AQE?" 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree. This whole approach is still confusing. What I really wanted to do was not have any messages related to the AQE operators. Currently, we either replace an operator or show a warning. I think we need a third option of "this does not need replacing". I'm going to look at this again today and see if this can be implemented without major surgery on GpuOverrides.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I totally agree. There are other places we want this too. Perhaps it is just a list of class names that we should not warn about at all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed an update to introduce a new DoNotReplaceSparkPlanMeta rule and logic to suppress warnings about operators that use this rule. I manually tested with TPC-DS with AQE and it looks a lot cleaner now.

override def convertToGpu(): GpuExec = throw new IllegalStateException()
}),
exec[BroadcastQueryStageExec]("Broadcast query stage", (exec, conf, p, r) =>
new SparkPlanMeta[BroadcastQueryStageExec](exec, conf, p, r) {
override def tagPlanForGpu(): Unit =
willNotWorkOnGpu("this query stage already started executing")
override def convertToGpu(): GpuExec = throw new IllegalStateException()
}),
exec[ShuffleQueryStageExec]("Shuffle query stage", (exec, conf, p, r) =>
new SparkPlanMeta[ShuffleQueryStageExec](exec, conf, p, r) {
override def tagPlanForGpu(): Unit =
willNotWorkOnGpu("this query stage already started executing")
override def convertToGpu(): GpuExec = throw new IllegalStateException()
})
).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap
val execs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] =
Expand Down