Skip to content

Commit

Permalink
Add Qualification tool support (NVIDIA#2574)
Browse files Browse the repository at this point in the history
* Qualification tool

Signed-off-by: Thomas Graves <tgraves@apache.org>

* remove unused func

* Add missing files

* Add checks for format option

* cast columsn to string to write to text

* Revert "Add checks for format option"

This reverts commit 6f5271c.

* cleanup

Signed-off-by: Thomas Graves <tgraves@nvidia.com>

* update output dir

* formating

* Update help messages

* update app name

* cleanup

* put test functions back

* fix typo
  • Loading branch information
tgravescs authored Jun 4, 2021
1 parent 1ba105e commit aaa7ceb
Show file tree
Hide file tree
Showing 37 changed files with 33,446 additions and 486 deletions.
52 changes: 37 additions & 15 deletions rapids-4-spark-tools/README.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,41 @@
# Spark profiling tool
# Spark Qualification and Profiling tools

This is a profiling tool to parse and analyze Spark event logs.
It generates information which can be used for debugging and profiling. Information such as Spark version, executor information, Properties and so on.
Works with both cpu and gpu generated event logs.
The qualification tool is used to rank a set of applications to determine if the RAPIDS Accelerator for Apache Spark is a good fit for those applications.

The profiling tool generates information which can be used for ranking applications debugging and profiling.
Information such as Spark version, executor information, Properties and so on. Works with both cpu and gpu generated event logs.

(The code is based on Apache Spark 3.1.1 source code, and tested using Spark 3.0.x and 3.1.1 event logs)

## Prerequisites
1. Request Spark 3.1.1 or newer installed
1. Require Spark 3.1.1 or newer installed

## How to compile and use with Spark
## How to compile
1. `mvn clean package`
2. Include rapids-4-spark-tools_2.12-<version>.jar in the '--jars' option to spark-shell or spark-submit
3. After starting spark-shell:

## Qualification Tool

### Use from Spark
1. Include rapids-4-spark-tools_2.12-<version>.jar in the '--jars' option to spark-shell or spark-submit
2. After starting spark-shell:

For multiple event logs comparison and analysis:
```
com.nvidia.spark.rapids.tool.qualification.QualificationMain.main(Array("/path/to/eventlog1", "/path/to/eventlog2"))
```

### How to compile and use from command-line
1. `./bin/spark-submit --class com.nvidia.spark.rapids.tool.qualification.QualificationMain <Spark-Rapids-Repo>/rapids-4-spark-tools/target/rapids-4-spark-tools_2.12-<version>.jar /path/to/eventlog1 /path/to/eventlog2`

### Output
By default this outputs a file `./rapids_4_spark_qualification.log` that contains the rankings of the applications. The output
location can be changed using the `--output-directory` option. Run `--help` for more information.

## Profiling Tool

### Use from Spark
1. Include rapids-4-spark-tools_2.12-<version>.jar in the '--jars' option to spark-shell or spark-submit
2. After starting spark-shell:
For a single event log analysis:
```
com.nvidia.spark.rapids.tool.profiling.ProfileMain.main(Array("/path/to/eventlog1"))
Expand All @@ -23,12 +46,11 @@ For multiple event logs comparison and analysis:
com.nvidia.spark.rapids.tool.profiling.ProfileMain.main(Array("/path/to/eventlog1", "/path/to/eventlog2"))
```

## How to compile and use from command-line
1. `mvn clean package`
2. `cd $SPARK_HOME (Download Apache Spark if required)`
3. `./bin/spark-submit --class com.nvidia.spark.rapids.tool.profiling.ProfileMain <Spark-Rapids-Repo>/rapids-4-spark-tools/target/rapids-4-spark-tools_2.12-<version>.jar /path/to/eventlog1`
### How to compile and use from command-line
1. `cd $SPARK_HOME (Download Apache Spark if required)`
2. `./bin/spark-submit --class com.nvidia.spark.rapids.tool.profiling.ProfileMain <Spark-Rapids-Repo>/rapids-4-spark-tools/target/rapids-4-spark-tools_2.12-<version>.jar /path/to/eventlog1`

## Options
### Options
```
$ ./bin/spark-submit --class com.nvidia.spark.rapids.tool.profiling.ProfileMain <Spark-Rapids-Repo>/rapids-4-spark-tools/target/rapids-4-spark-tools_2.12-<version>.jar --help
Expand Down Expand Up @@ -59,8 +81,8 @@ For usage see below:
s3a://<BUCKET>/eventlog1 /path/to/eventlog2
```

## Functions
### A. Collect Information or Compare Information(if more than 1 eventlogs are as input)
### Functions
#### A. Collect Information or Compare Information(if more than 1 eventlogs are as input)
- Print Application Information
- Print Executors information
- Print Rapids related parameters
Expand Down
11 changes: 11 additions & 0 deletions rapids-4-spark-tools/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@
</dependencies>

<build>
<resources>
<resource>
<directory>${project.basedir}/src/main/resources</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
Expand Down Expand Up @@ -121,6 +126,12 @@
</includes>
</artifactSet>
<filters>
<filter>
<artifact>org.rogach:scallop_${scala.binary.version}:*</artifact>
<excludes>
<exclude>META-INF/*.MF</exclude>
</excludes>
</filter>
<filter>
<artifact>*:*</artifact>
<excludes>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package com.nvidia.spark.rapids.tool.profiling

import java.io.FileWriter

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.DataFrame
Expand All @@ -27,15 +29,14 @@ import org.apache.spark.sql.rapids.tool.profiling._
* Does analysis on the DataFrames
* from object of ApplicationInfo
*/
class Analysis(apps: ArrayBuffer[ApplicationInfo]) {
class Analysis(apps: ArrayBuffer[ApplicationInfo], fileWriter: Option[FileWriter]) {

require(apps.nonEmpty)
private val fileWriter = apps.head.fileWriter

// Job Level TaskMetrics Aggregation
def jobMetricsAggregation(): Unit = {
if (apps.size == 1) {
fileWriter.write("Job level aggregated task metrics:")
fileWriter.foreach(_.write("Job level aggregated task metrics:"))
apps.head.runQuery(apps.head.jobMetricsAggregationSQL + " order by Duration desc")
} else {
var query = ""
Expand All @@ -46,15 +47,15 @@ class Analysis(apps: ArrayBuffer[ApplicationInfo]) {
query += " union " + app.jobMetricsAggregationSQL
}
}
fileWriter.write("Job level aggregated task metrics:")
fileWriter.foreach(_.write("Job level aggregated task metrics:"))
apps.head.runQuery(query + " order by appIndex, Duration desc")
}
}

// Stage Level TaskMetrics Aggregation
def stageMetricsAggregation(): Unit = {
if (apps.size == 1) {
fileWriter.write("Stage level aggregated task metrics:")
fileWriter.foreach(_.write("Stage level aggregated task metrics:"))
apps.head.runQuery(apps.head.stageMetricsAggregationSQL + " order by Duration desc")
} else {
var query = ""
Expand All @@ -65,7 +66,7 @@ class Analysis(apps: ArrayBuffer[ApplicationInfo]) {
query += " union " + app.stageMetricsAggregationSQL
}
}
fileWriter.write("Stage level aggregated task metrics:")
fileWriter.foreach(_.write("Stage level aggregated task metrics:"))
apps.head.runQuery(query + " order by appIndex, Duration desc")
}
}
Expand All @@ -84,7 +85,7 @@ class Analysis(apps: ArrayBuffer[ApplicationInfo]) {
query += " union " + app.jobAndStageMetricsAggregationSQL
}
}
fileWriter.write("Job + Stage level aggregated task metrics:")
fileWriter.foreach(_.write("Job + Stage level aggregated task metrics:"))
apps.head.runQuery(query + " order by appIndex, Duration desc")
}
}
Expand Down Expand Up @@ -115,7 +116,7 @@ class Analysis(apps: ArrayBuffer[ApplicationInfo]) {

// custom query execution. Normally for debugging use.
def customQueryExecution(app: ApplicationInfo): Unit = {
fileWriter.write("Custom query execution:")
fileWriter.foreach(_.write("Custom query execution:"))
val customQuery =
s"""select stageId from stageDF_${app.index} limit 1
|""".stripMargin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ case class SQLExecutionCase(
startTime: Long,
endTime: Option[Long],
duration: Option[Long],
durationStr: String)
durationStr: String,
sqlQualDuration: Option[Long],
problematic: String = "")

case class SQLPlanMetricsCase(
sqlID: Long,
Expand Down Expand Up @@ -92,7 +94,7 @@ case class JobCase(
startTime: Long,
endTime: Option[Long],
jobResult: Option[String],
failedReason: String,
failedReason: Option[String],
duration: Option[Long],
durationStr: String,
gpuMode: Boolean)
Expand Down Expand Up @@ -155,4 +157,6 @@ case class TaskCase(
output_bytesWritten: Long,
output_recordsWritten: Long)

case class ProblematicSQLCase(sqlID: Long, reason: String, desc: String)
case class DatasetSQLCase(sqlID: Long)

case class ProblematicSQLCase(sqlID: Long, reason: String)
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package com.nvidia.spark.rapids.tool.profiling

import java.io.File
import java.io.{File, FileWriter}
import java.util.concurrent.TimeUnit

import scala.collection.mutable
Expand All @@ -29,16 +29,16 @@ import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo
* CollectInformation mainly print information based on this event log:
* Such as executors, parameters, etc.
*/
class CollectInformation(apps: ArrayBuffer[ApplicationInfo]) {
class CollectInformation(apps: ArrayBuffer[ApplicationInfo], fileWriter: FileWriter) {

require(apps.nonEmpty)
private val fileWriter = apps.head.fileWriter

// Print Application Information
def printAppInfo(): Unit = {
val messageHeader = "Application Information:\n"
for (app <- apps) {
app.runQuery(query = app.generateAppInfo, writeToFile = true, messageHeader = messageHeader)
app.runQuery(query = app.generateAppInfo, fileWriter = Some(fileWriter),
messageHeader = messageHeader)
}
}

Expand Down Expand Up @@ -66,20 +66,20 @@ class CollectInformation(apps: ArrayBuffer[ApplicationInfo]) {
val messageHeader = "\n\nExecutor Information:\n"
for (app <- apps) {
app.runQuery(query = app.generateExecutorInfo + " order by cast(executorID as long)",
writeToFile = true, messageHeader = messageHeader)
fileWriter = Some(fileWriter), messageHeader = messageHeader)
}
}

// Print Rapids related Spark Properties
def printRapidsProperties(): Unit = {
val messageHeader = "\n\nSpark Rapids parameters set explicitly:\n"
for (app <- apps) {
app.runQuery(query = app.generateRapidsProperties + " order by key", writeToFile = true,
messageHeader = messageHeader)
app.runQuery(query = app.generateRapidsProperties + " order by key",
fileWriter = Some(fileWriter), messageHeader = messageHeader)
}
}

def generateDot(): Unit = {
def generateDot(outputDirectory: String): Unit = {
for (app <- apps) {
val requiredDataFrames = Seq("sqlMetricsDF", "driverAccumDF",
"taskStageAccumDF", "taskStageAccumDF")
Expand All @@ -95,7 +95,7 @@ class CollectInformation(apps: ArrayBuffer[ApplicationInfo]) {
val list = map.getOrElseUpdate(row.getLong(0), new ArrayBuffer[(Long, Long)]())
list += row.getLong(1) -> row.getLong(2)
}
val outDir = new File(app.args.outputDirectory())
val outDir = new File(outputDirectory)
for ((sqlID, planInfo) <- app.sqlPlan) {
val fileDir = new File(outDir, s"${app.appId}-query-$sqlID")
fileDir.mkdirs()
Expand All @@ -113,4 +113,4 @@ class CollectInformation(apps: ArrayBuffer[ApplicationInfo]) {
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.nvidia.spark.rapids.tool.profiling

import java.io.FileWriter

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.internal.Logging
Expand All @@ -23,10 +25,10 @@ import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo
/**
* CompareApplications compares multiple ApplicationInfo objects
*/
class CompareApplications(apps: ArrayBuffer[ApplicationInfo]) extends Logging {
class CompareApplications(apps: ArrayBuffer[ApplicationInfo],
fileWriter: FileWriter) extends Logging {

require(apps.size>1)
private val fileWriter = apps.head.fileWriter

// Compare the App Information.
def compareAppInfo(): Unit = {
Expand All @@ -42,7 +44,7 @@ class CompareApplications(apps: ArrayBuffer[ApplicationInfo]) extends Logging {
}
i += 1
}
apps.head.runQuery(query = query, writeToFile = true, messageHeader = messageHeader)
apps.head.runQuery(query = query, fileWriter = Some(fileWriter), messageHeader = messageHeader)
}

// Compare Executors information
Expand All @@ -59,7 +61,7 @@ class CompareApplications(apps: ArrayBuffer[ApplicationInfo]) extends Logging {
}
i += 1
}
apps.head.runQuery(query = query, writeToFile = true, messageHeader = messageHeader)
apps.head.runQuery(query = query, fileWriter = Some(fileWriter), messageHeader = messageHeader)
}

// Compare Rapids Properties which are set explicitly
Expand Down Expand Up @@ -97,6 +99,6 @@ class CompareApplications(apps: ArrayBuffer[ApplicationInfo]) extends Logging {
query = withClauseAllKeys + selectKeyPart + selectValuePart +
" from (\n" + query + "\n) order by key"
logDebug("Running query " + query)
apps.head.runQuery(query = query, writeToFile = true, messageHeader = messageHeader)
apps.head.runQuery(query = query, fileWriter = Some(fileWriter), messageHeader = messageHeader)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,27 @@ import org.rogach.scallop.{ScallopConf, ScallopOption}
class ProfileArgs(arguments: Seq[String]) extends ScallopConf(arguments) {

banner("""
Spark profiling tool
RAPIDS Accelerator for Apache Spark profiling tool
Example:
# Input 1 or more event logs from local path:
./bin/spark-submit --class com.nvidia.spark.rapids.tool.profiling.ProfileMain
<Spark-Rapids-Repo>/rapids-4-spark-tools/target/rapids-4-spark-tools_2.12-<version>.jar
/path/to/eventlog1 /path/to/eventlog2
rapids-4-spark-tools_2.12-<version>.jar /path/to/eventlog1 /path/to/eventlog2
# Specify a directory of event logs from local path:
./bin/spark-submit --class com.nvidia.spark.rapids.tool.profiling.ProfileMain
rapids-4-spark-tools_2.12-<version>.jar /path/to/DirOfManyEventLogs
# If any event log is from S3:
export AWS_ACCESS_KEY_ID=xxx
export AWS_SECRET_ACCESS_KEY=xxx
./bin/spark-submit --class com.nvidia.spark.rapids.tool.profiling.ProfileMain
<Spark-Rapids-Repo>/rapids-4-spark-tools/target/rapids-4-spark-tools_2.12-<version>.jar
s3a://<BUCKET>/eventlog1 /path/to/eventlog2
rapids-4-spark-tools_2.12-<version>.jar s3a://<BUCKET>/eventlog1 /path/to/eventlog2
# Change output directory to /tmp
./bin/spark-submit --class com.nvidia.spark.rapids.tool.profiling.ProfileMain
<Spark-Rapids-Repo>/rapids-4-spark-tools/target/rapids-4-spark-tools_2.12-<version>.jar
-o /tmp /path/to/eventlog1
rapids-4-spark-tools_2.12-<version>.jar -o /tmp /path/to/eventlog1
For usage see below:
""")
Expand All @@ -64,4 +65,4 @@ For usage see below:
descr = "Generate query visualizations in DOT format. Default is false")

verify()
}
}
Loading

0 comments on commit aaa7ceb

Please sign in to comment.