Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report equivilant stages/sql ids as a part of compare #2793

Merged
merged 3 commits into from
Jun 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions tools/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,50 @@ Compare Executor Information:
+--------+-----------------+------------+-------------+-----------+------------+-------------+-------------+--------------+------------------+---------------+-------+-------+
```

- Matching SQL IDs Across Applications:
```
Matching SQL IDs Across Applications:
+-----------------------+-----------------------+
|app-20210329165943-0103|app-20210329170243-0018|
+-----------------------+-----------------------+
|0 |0 |
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be helpful to also add time each took? I guess we can always add more later.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya I thought about that, but I didn't want to make it too complicated to start out with.

|1 |1 |
|2 |2 |
|3 |3 |
|4 |4 |
+-----------------------+-----------------------+
```

There is one column per application. There is a row per SQL ID. The SQL IDs are matched
primarily on the structure of the SQL query run, and then on the order in which they were
run. Be aware that this is truly the structure of the query. Two queries that do similar
things, but on different data are likely to match as the same. An effort is made to
also match between CPU plans and GPU plans so in most cases the same query run on the
CPU and on the GPU will match.

- Matching Stage IDs Across Applications:
```
Matching Stage IDs Across Applications:
+-----------------------+-----------------------+
|app-20210329165943-0103|app-20210329170243-0018|
+-----------------------+-----------------------+
|31 |31 |
|32 |32 |
|33 |33 |
|39 |38 |
|40 |40 |
|41 |41 |
+-----------------------+-----------------------+
```

There is one column per application. There is a row per stage ID. If a SQL query matches
between applications, see Matching SQL IDs Across Applications, then an attempt is made
to match stages within that application to each other. This has the same issues with
stages when generating a dot graph. This can be especially helpful when trying to compare
large queries and Spark happened to assign the stage IDs slightly differently, or in some
cases there are a different number of stages because of slight differences in the plan. This
is a best effort, and it is not guaranteed to match up all stages in a plan.

- Compare Rapids related Spark properties side-by-side:
```
Compare Rapids Properties which are set explicitly:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,10 @@

package com.nvidia.spark.rapids.tool.profiling

import java.util.concurrent.TimeUnit

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import com.nvidia.spark.rapids.tool.ToolTextFileWriter

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo

case class StageMetrics(numTasks: Int, duration: String)
Expand Down Expand Up @@ -125,102 +120,16 @@ class CollectInformation(apps: Seq[ApplicationInfo],
}
}

def generateDot(outputDirectory: String, accumsOpt: Option[DataFrame]): Unit = {
for (app <- apps) {
val requiredDataFrames = Seq("sqlMetricsDF", "driverAccumDF",
"taskStageAccumDF", "taskStageAccumDF")
.map(name => s"${name}_${app.index}")
if (requiredDataFrames.forall(app.allDataFrames.contains)) {
val start = System.nanoTime()
val accums = accumsOpt.getOrElse(app.runQuery(app.generateSQLAccums))

val accumIdToStageId = app.runQuery(
s"""
| select
| stageId,
| accumulatorId
| from taskStageAccumDF_${app.index}
| """.stripMargin).groupBy("accumulatorId").agg(
max(col("stageId")).alias("stageId")).collect().map { row =>
(row.getLong(0), row.getInt(1))
}.toMap

val formatter = java.text.NumberFormat.getIntegerInstance

val stageIdToStageMetrics = app.runQuery(
s"""
|select
| stageId,
| duration
| from taskDF_${app.index}
|""".stripMargin).groupBy(col("stageId"))
.agg(min(col("duration")).alias("min_dur"),
max(col("duration")).alias("max_dur"),
mean(col("duration")).alias("mean_dur"),
count(col("duration")).alias("num_tasks"))
.collect().map { row =>
val stageId = row.getInt(0)
val minDur = row.getLong(1)
val maxDur = row.getLong(2)
val meanDur = row.getDouble(3)
val numTasks = row.getLong(4)
(stageId, StageMetrics(numTasks.toInt,
s"MIN: ${formatter.format(minDur)} ms " +
s"MAX: ${formatter.format(maxDur)} ms " +
s"AVG: ${formatter.format(meanDur)} ms"))
}.toMap

val accumSummary = accums
.select(col("sqlId"), col("accumulatorId"), col("max_value"))
.collect()
val sqlIdToMaxMetric = new mutable.HashMap[Long, ArrayBuffer[(Long,Long)]]()
for (row <- accumSummary) {
val list = sqlIdToMaxMetric.getOrElseUpdate(row.getLong(0),
new ArrayBuffer[(Long, Long)]())
list += row.getLong(1) -> row.getLong(2)
}

val sqlPlansMap = app.sqlPlan.map { case (sqlId, sparkPlanInfo) =>
sqlId -> ((sparkPlanInfo, app.physicalPlanDescription(sqlId)))
}
for ((sqlID, (planInfo, physicalPlan)) <- sqlPlansMap) {
val dotFileWriter = new ToolTextFileWriter(outputDirectory,
s"${app.appId}-query-$sqlID.dot")
try {
val metrics = sqlIdToMaxMetric.getOrElse(sqlID, Seq.empty).toMap
GenerateDot.writeDotGraph(QueryPlanWithMetrics(planInfo, metrics),
physicalPlan, accumIdToStageId, stageIdToStageMetrics, dotFileWriter,
sqlID, app.appId)
} finally {
dotFileWriter.close()
}
}

val duration = TimeUnit.SECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS)
fileWriter.foreach(_.write(s"Generated DOT graphs for app ${app.appId} " +
s"to $outputDirectory in $duration second(s)\n"))
} else {
val missingDataFrames = requiredDataFrames.filterNot(app.allDataFrames.contains)
fileWriter.foreach(_.write(s"Could not generate DOT graph for app ${app.appId} " +
s"because of missing data frames: ${missingDataFrames.mkString(", ")}\n"))
}
}
}

// Print SQL Plan Metrics
def printSQLPlanMetrics(shouldGenDot: Boolean, outputDir: String): Unit = {
def printSQLPlanMetrics(): Unit = {
for (app <- apps){
if (app.allDataFrames.contains(s"sqlMetricsDF_${app.index}") &&
app.allDataFrames.contains(s"driverAccumDF_${app.index}") &&
app.allDataFrames.contains(s"taskStageAccumDF_${app.index}") &&
app.allDataFrames.contains(s"jobDF_${app.index}") &&
app.allDataFrames.contains(s"sqlDF_${app.index}")) {
val messageHeader = "\nSQL Plan Metrics for Application:\n"
val accums = app.runQuery(app.generateSQLAccums, fileWriter = fileWriter,
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
messageHeader=messageHeader)
if (shouldGenDot) {
generateDot(outputDir, Some(accums))
}
app.runQuery(app.generateSQLAccums, fileWriter = fileWriter, messageHeader=messageHeader)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@

package com.nvidia.spark.rapids.tool.profiling

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import com.nvidia.spark.rapids.tool.ToolTextFileWriter

import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo
import org.apache.spark.sql.Row
import org.apache.spark.sql.rapids.tool.profiling.{ApplicationInfo, SparkPlanInfoWithStage}
import org.apache.spark.sql.types.{StringType, StructField, StructType}

/**
* CompareApplications compares multiple ApplicationInfo objects
Expand All @@ -29,6 +35,114 @@ class CompareApplications(apps: Seq[ApplicationInfo],

require(apps.size>1)

def findMatchingStages(): Unit = {
val normalizedByAppId = apps.map { app =>
val normalized = app.sqlPlan.mapValues { plan =>
SparkPlanInfoWithStage(plan, app.accumIdToStageId).normalizeForStageComparison
}
(app.appId, normalized)
}.toMap

val appIdToSortedSqlIds = mutable.Map[String, mutable.Buffer[Long]]()
appIdToSortedSqlIds ++= normalizedByAppId.mapValues { sqlIdToPlan =>
sqlIdToPlan.keys.toList.sorted.toBuffer
}

// Each line holds a map with app id/sql id that match each other
val matchingSqlIds = new ArrayBuffer[mutable.HashMap[String, Long]]()
val matchingStageIds = new ArrayBuffer[mutable.HashMap[String, Int]]()

while (appIdToSortedSqlIds.nonEmpty) {
val appIds = appIdToSortedSqlIds.keys.toSeq.sorted
val sourceAppId = appIds.head
val sourceSqlId = appIdToSortedSqlIds(sourceAppId).head
val sourcePlan = normalizedByAppId(sourceAppId)(sourceSqlId)

val sqlMatches = mutable.HashMap[String, Long]()
sqlMatches(sourceAppId) = sourceSqlId
// The key is the stage for the source app id. The values are pairs of appid/stage
// for the matching stages in other apps
val stageMatches = new mutable.HashMap[Int, mutable.Buffer[(String, Int)]]()
sourcePlan.depthFirstStages.distinct.flatten.foreach { stage =>
stageMatches(stage) = new mutable.ArrayBuffer[(String, Int)]()
}

// Now we want to find the first plan in each app that matches. The sorting is
// because we assume that the SQL commands are run in the same order, so it should
// make it simpler to find them.
appIds.slice(1, appIds.length).foreach { probeAppId =>
var matchForProbedApp: Option[Long] = None
appIdToSortedSqlIds(probeAppId).foreach { probeSqlId =>
if (matchForProbedApp.isEmpty) {
val probePlan = normalizedByAppId(probeAppId)(probeSqlId)
if (probePlan.equals(sourcePlan)) {
sourcePlan.depthFirstStages.zip(probePlan.depthFirstStages).filter {
case (a, b) => a.isDefined && b.isDefined
}.distinct.foreach {
case (sourceStageId, probeStageId) =>
stageMatches(sourceStageId.get).append((probeAppId, probeStageId.get))
}
matchForProbedApp = Some(probeSqlId)
}
}
}

matchForProbedApp.foreach { foundId =>
sqlMatches(probeAppId) = foundId
}
}

stageMatches.toSeq.sortWith {
case (a, b) => a._1 < b._1
}.foreach {
case (sourceStage, others) =>
val ret = mutable.HashMap[String, Int]()
ret(sourceAppId) = sourceStage
others.foreach {
case (appId, stageId) => ret(appId) = stageId
}
matchingStageIds.append(ret)
}

// Remove the matches from the data structures
sqlMatches.foreach {
case (appId, sqlId) =>
appIdToSortedSqlIds(appId) -= sqlId
if (appIdToSortedSqlIds(appId).isEmpty) {
appIdToSortedSqlIds.remove(appId)
}
}

matchingSqlIds += sqlMatches
}

val outputAppIds = normalizedByAppId.keys.toSeq.sorted

val matchingSqlData = matchingSqlIds.map { info =>
Row(outputAppIds.map { appId =>
info.get(appId).map(_.toString).getOrElse("")
}: _*)
}.toList.asJava

val matchingType = StructType(outputAppIds.map(id => StructField(id, StringType)))

apps.head.writeAsDF(matchingSqlData,
matchingType,
"\n\nMatching SQL IDs Across Applications:\n",
fileWriter)

val matchingStageData = matchingStageIds.map { info =>
Row(outputAppIds.map { appId =>
info.get(appId).map(_.toString).getOrElse("")
}: _*)
}.toList.asJava

apps.head.writeAsDF(matchingStageData,
matchingType,
"\n\nMatching Stage IDs Across Applications:\n",
fileWriter)
}

// Compare the App Information.
def compareAppInfo(): Unit = {
val messageHeader = "\n\nCompare Application Information:\n"
Expand Down
Loading