Skip to content

Commit

Permalink
Qualification tool: Parsing Execs to get the ExecInfo #1 (#5420)
Browse files Browse the repository at this point in the history
* Qualification tool: Parsing supported execs

Signed-off-by: Niranjan Artal <nartal@nvidia.com>

* addressed review comments

Signed-off-by: Niranjan Artal <nartal@nvidia.com>
  • Loading branch information
nartal1 authored May 4, 2022
1 parent 76b05c3 commit a5733d2
Show file tree
Hide file tree
Showing 11 changed files with 365 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.planparser

import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker

import org.apache.spark.sql.execution.ui.SparkPlanGraphNode

case class CoalesceExecParser(
node: SparkPlanGraphNode,
checker: PluginTypeChecker,
sqlID: Long) extends ExecParser {

val fullExecName = node.name + "Exec"

override def parse: Seq[ExecInfo] = {
// coalesce doesn't have duration
val duration = None
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
}
// TODO - add in parsing expressions - average speedup across?
Seq(ExecInfo(sqlID, node.name, "", speedupFactor,
duration, node.id, isSupported, None))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.planparser

import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker

import org.apache.spark.sql.execution.ui.SparkPlanGraphNode

case class CollectLimitExecParser(
node: SparkPlanGraphNode,
checker: PluginTypeChecker,
sqlID: Long) extends ExecParser {

val fullExecName = node.name + "Exec"

override def parse: Seq[ExecInfo] = {
// collect doesn't have duration
val duration = None
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
}
// TODO - add in parsing expressions - average speedup across?
Seq(ExecInfo(sqlID, node.name, "", speedupFactor,
duration, node.id, isSupported, None))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.planparser

import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker

import org.apache.spark.sql.execution.ui.SparkPlanGraphNode

case class ExpandExecParser(
node: SparkPlanGraphNode,
checker: PluginTypeChecker,
sqlID: Long) extends ExecParser {

val fullExecName = node.name + "Exec"

override def parse: Seq[ExecInfo] = {
// Expand doesn't have duration
val duration = None
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
}
// TODO - add in parsing expressions - average speedup across?
Seq(ExecInfo(sqlID, node.name, "", speedupFactor,
duration, node.id, isSupported, None))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ case class FilterExecParser(
override def parse: Seq[ExecInfo] = {
// filter doesn't have duration
val duration = None
val (filterSpeedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
}
// TODO - add in parsing expressions - average speedup across?
Seq(ExecInfo(sqlID, node.name, "", filterSpeedupFactor,
Seq(ExecInfo(sqlID, node.name, "", speedupFactor,
duration, node.id, isSupported, None))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ case class ProjectExecParser(
override def parse: Seq[ExecInfo] = {
// Project doesn't have duration
val duration = None
val (filterSpeedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
}
// TODO - add in parsing expressions - average speedup across?
Seq(ExecInfo(sqlID, node.name, "", filterSpeedupFactor,
Seq(ExecInfo(sqlID, node.name, "", speedupFactor,
duration, node.id, isSupported, None))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.planparser

import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker

import org.apache.spark.sql.execution.ui.SparkPlanGraphNode

case class RangeExecParser(
node: SparkPlanGraphNode,
checker: PluginTypeChecker,
sqlID: Long) extends ExecParser {

val fullExecName = node.name + "Exec"

override def parse: Seq[ExecInfo] = {
// range doesn't have duration
val duration = None
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
}
// TODO - add in parsing expressions - average speedup across?
Seq(ExecInfo(sqlID, node.name, "", speedupFactor,
duration, node.id, isSupported, None))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,26 @@ object SQLPlanParser extends Logging {
app: AppBase
): Seq[ExecInfo] = {
node match {
case w if (w.name.contains("WholeStageCodegen")) =>
WholeStageExecParser(w.asInstanceOf[SparkPlanGraphCluster], checker, sqlID, app).parse
case c if (c.name == "Coalesce") =>
CoalesceExecParser(c, checker, sqlID).parse
case c if (c.name == "CollectLimit") =>
CollectLimitExecParser(c, checker, sqlID).parse
case e if (e.name == "Expand") =>
ExpandExecParser(e, checker, sqlID).parse
case f if (f.name == "Filter") =>
FilterExecParser(f, checker, sqlID).parse
case p if (p.name == "Project") =>
ProjectExecParser(p, checker, sqlID).parse
case r if (r.name == "Range") =>
RangeExecParser(r, checker, sqlID).parse
case s if (s.name == "Sample") =>
SampleExecParser(s, checker, sqlID).parse
case t if (t.name == "TakeOrderedAndProject") =>
TakeOrderedAndProjectExecParser(t, checker, sqlID).parse
case u if (u.name == "Union") =>
UnionExecParser(u, checker, sqlID).parse
case w if (w.name.contains("WholeStageCodegen")) =>
WholeStageExecParser(w.asInstanceOf[SparkPlanGraphCluster], checker, sqlID, app).parse
case o =>
logDebug(s"other graph node ${node.name} desc: ${node.desc} id: ${node.id}")
ArrayBuffer(ExecInfo(sqlID, o.name, expr = "", 1, duration = None, o.id,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.planparser

import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker

import org.apache.spark.sql.execution.ui.SparkPlanGraphNode

case class SampleExecParser(
node: SparkPlanGraphNode,
checker: PluginTypeChecker,
sqlID: Long) extends ExecParser {

val fullExecName = node.name + "Exec"

override def parse: Seq[ExecInfo] = {
// sample doesn't have duration
val duration = None
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
}
// TODO - add in parsing expressions - average speedup across?
Seq(ExecInfo(sqlID, node.name, "", speedupFactor,
duration, node.id, isSupported, None))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.planparser

import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker

import org.apache.spark.sql.execution.ui.SparkPlanGraphNode

case class TakeOrderedAndProjectExecParser(
node: SparkPlanGraphNode,
checker: PluginTypeChecker,
sqlID: Long) extends ExecParser {

val fullExecName = node.name + "Exec"

override def parse: Seq[ExecInfo] = {
// TakeOrderedAndProject doesn't have duration
val duration = None
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
}
// TODO - add in parsing expressions - average speedup across?
Seq(ExecInfo(sqlID, node.name, "", speedupFactor,
duration, node.id, isSupported, None))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.planparser

import com.nvidia.spark.rapids.tool.qualification.PluginTypeChecker

import org.apache.spark.sql.execution.ui.SparkPlanGraphNode

case class UnionExecParser(
node: SparkPlanGraphNode,
checker: PluginTypeChecker,
sqlID: Long) extends ExecParser {

val fullExecName = node.name + "Exec"

override def parse: Seq[ExecInfo] = {
// Union doesn't have duration
val duration = None
val (speedupFactor, isSupported) = if (checker.isExecSupported(fullExecName)) {
(checker.getSpeedupFactor(fullExecName), true)
} else {
(1, false)
}
// TODO - add in parsing expressions - average speedup across?
Seq(ExecInfo(sqlID, node.name, "", speedupFactor,
duration, node.id, isSupported, None))
}
}
Loading

0 comments on commit a5733d2

Please sign in to comment.