Skip to content

Commit

Permalink
Profiling tool: Add support for health check. (#2632)
Browse files Browse the repository at this point in the history
* Profiling tool: Add support to list failed tasks, jobs, stages and unsupported SQL

Signed-off-by: Niranjan Artal <nartal@nvidia.com>

* add tests for health check features

Signed-off-by: Niranjan Artal <nartal@nvidia.com>
  • Loading branch information
nartal1 authored Jun 8, 2021
1 parent ae33cc4 commit 1229894
Show file tree
Hide file tree
Showing 14 changed files with 41,925 additions and 1 deletion.
29 changes: 29 additions & 0 deletions tools/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,35 @@ Shuffle Skew Check: (When task's Shuffle Read Size > 3 * Avg Stage-level size)
|1 |2 |0 |2224 |1 |222.22 |8.8 |3333.33 |111.11 |0.01 |false |ExceptionFailure(ai.rapids.cudf.CudfException,cuDF failure at: /dddd/xxxxxxx/ccccc/bbbbbbbbb/aaaaaaa|
+--------+-------+--------------+------+-------+---------------+--------------+-----------------+----------------+----------------+----------+----------------------------------------------------------------------------------------------------+
```
#### C. Health Check
```
Application application_1616746343401_0025 (index=1) failed tasks:
+-------+--------------+------+-------+------------------------------------+
|stageId|stageAttemptId|taskId|attempt|endReason_first36char |
+-------+--------------+------+-------+------------------------------------+
|4 |0 |2842 |0 |ExceptionFailure(ai.rapids.cudf.Cudf|
|4 |0 |2858 |0 |TaskKilled(another attempt succeeded|
|4 |0 |2884 |0 |TaskKilled(another attempt succeeded|
|4 |0 |2908 |0 |TaskKilled(another attempt succeeded|
|4 |0 |3410 |1 |ExceptionFailure(ai.rapids.cudf.Cudf|
+-------+--------------+------+-------+------------------------------------+
Application application_1616746343401_0025 (index=1) failed stages:
+-------+---------+-------------------------------------+--------+---------------------------------------------------+
|stageId|attemptId|name |numTasks|failureReason_first100char |
+-------+---------+-------------------------------------+--------+---------------------------------------------------+
|4 |0 |attachTree at Spark300Shims.scala:624|1000 |Job 0 cancelled as part of cancellation of all jobs|
+-------+---------+-------------------------------------+--------+---------------------------------------------------+
Application application_1616746343401_0025 (index=1) failed jobs:
+-----+---------+------------------------------------------------------------------------+
|jobID|jobResult|failedReason_first100char |
+-----+---------+------------------------------------------------------------------------+
|0 |JobFailed|java.lang.Exception: Job 0 cancelled as part of cancellation of all jobs|
+-----+---------+------------------------------------------------------------------------+
```
### How to use this tool
This tool parses the Spark CPU or GPU event log(s) and creates an output report.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,5 @@ case class TaskCase(
case class DatasetSQLCase(sqlID: Long)

case class ProblematicSQLCase(sqlID: Long, reason: String)

case class UnsupportedSQLPlan(sqlID: Long, nodeID: Long, nodeName: String, nodeDesc: String)
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.profiling

import com.nvidia.spark.rapids.tool.ToolTextFileWriter
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo

/**
* HealthCheck defined health check rules
*/
class HealthCheck(apps: ArrayBuffer[ApplicationInfo], textFileWriter:ToolTextFileWriter){

require(apps.nonEmpty)

// Function to list all failed tasks , stages and jobs.
def listFailedJobsStagesTasks(): Unit = {
for (app <- apps) {
// Look for failed tasks.
val tasksMessageHeader = s"Application ${app.appId} (index=${app.index}) failed tasks:\n"
app.runQuery(query = app.getFailedTasks, fileWriter = Some(textFileWriter),
messageHeader = tasksMessageHeader)

// Look for failed stages.
val stagesMessageHeader = s"Application ${app.appId} (index=${app.index}) failed stages:\n"
app.runQuery(query = app.getFailedStages, fileWriter = Some(textFileWriter),
messageHeader = stagesMessageHeader)

// Look for failed jobs.
val jobsMessageHeader = s"Application ${app.appId} (index=${app.index}) failed jobs:\n"
app.runQuery(query = app.getFailedJobs, fileWriter = Some(textFileWriter),
messageHeader = jobsMessageHeader)
}
}

//Function to list all SparkListenerBlockManagerRemoved
def listRemovedBlockManager(): Unit = {
for (app <- apps) {
if (app.allDataFrames.contains(s"blockManagersRemovedDF_${app.index}")) {
val blockManagersMessageHeader =
s"Application ${app.appId} (index=${app.index}) removed BlockManager(s):\n"
app.runQuery(query = app.getblockManagersRemoved, fileWriter = Some(textFileWriter),
messageHeader = blockManagersMessageHeader)
}
}
}

//Function to list all SparkListenerExecutorRemoved
def listRemovedExecutors(): Unit = {
for (app <- apps) {
if (app.allDataFrames.contains(s"executorsRemovedDF_${app.index}")) {
val executorsRemovedMessageHeader =
s"Application ${app.appId} (index=${app.index}) removed Executors(s):\n"
app.runQuery(query = app.getExecutorsRemoved, fileWriter = Some(textFileWriter),
messageHeader = executorsRemovedMessageHeader)
}
}
}

//Function to list all *possible* not-supported plan nodes if GPU Mode=on
def listPossibleUnsupportedSQLPlan(): Unit = {
textFileWriter.write("\nSQL Plan HealthCheck: Not supported SQL Plan\n")
for (app <- apps) {
if (app.allDataFrames.contains(s"sqlDF_${app.index}") && app.sqlPlan.nonEmpty) {
app.runQuery(query = app.unsupportedSQLPlan, fileWriter = Some(textFileWriter),
messageHeader = s"Application ${app.appId} (index=${app.index})\n")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,13 @@ object ProfileMain extends Logging {
sqlAggMetricsDF.createOrReplaceTempView("sqlAggMetricsDF")
analysis.sqlMetricsAggregationDurationAndCpuTime()
analysis.shuffleSkewCheck()

textFileWriter.write("\n### C. Health Check###\n")
val healthCheck=new HealthCheck(apps, textFileWriter)
healthCheck.listFailedJobsStagesTasks()
healthCheck.listRemovedBlockManager()
healthCheck.listRemovedExecutors()
healthCheck.listPossibleUnsupportedSQLPlan()
}

def logApplicationInfo(app: ApplicationInfo) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ class ApplicationInfo(
var taskGettingResult: ArrayBuffer[SparkListenerTaskGettingResult] =
ArrayBuffer[SparkListenerTaskGettingResult]()

//Unsupported SQL plan
var unsupportedSQLplan: ArrayBuffer[UnsupportedSQLPlan] = ArrayBuffer[UnsupportedSQLPlan]()

// From all other events
var otherEvents: ArrayBuffer[SparkListenerEvent] = ArrayBuffer[SparkListenerEvent]()

Expand Down Expand Up @@ -281,9 +284,13 @@ class ApplicationInfo(
// SQLPlanMetric is a case Class of
// (name: String,accumulatorId: Long,metricType: String)
val allnodes = planGraph.allNodes
for (node <- allnodes){
for (node <- allnodes) {
if (isDataSetPlan(node.desc)) {
datasetSQL += DatasetSQLCase(sqlID)
if (gpuMode) {
val thisPlan = UnsupportedSQLPlan(sqlID, node.id, node.name, node.desc)
unsupportedSQLplan += thisPlan
}
}
// Then process SQL plan metric type
for (metric <- node.metrics){
Expand Down Expand Up @@ -513,6 +520,15 @@ class ApplicationInfo(
} else {
logInfo("No Plan node accums Found. Create an empty Plan node accums DataFrame.")
}

// For unsupportedSQLPlanDF
allDataFrames += (s"unsupportedSQLplan_$index" -> unsupportedSQLplan.toDF)
if (unsupportedSQLplan.nonEmpty) {
logInfo(s"Total ${unsupportedSQLplan.size} Plan node accums for appID=$appId")
} else {
logInfo("No unSupportedSQLPlan node accums Found. " +
"Create an empty node accums DataFrame.")
}
}

for ((name, df) <- this.allDataFrames) {
Expand Down Expand Up @@ -730,6 +746,54 @@ class ApplicationInfo(
|""".stripMargin
}

def getFailedTasks: String = {
s"""select stageId, stageAttemptId, taskId, attempt,
|substr(endReason, 1, 36) as endReason_first36char
|from taskDF_$index
|where successful = false
|order by stageId, stageAttemptId, taskId, attempt
|""".stripMargin
}

def getFailedStages: String = {
s"""select stageId, attemptId, name, numTasks,
|substr(failureReason, 1, 100) as failureReason_first100char
|from stageDF_$index
|where failureReason is not null
|order by stageId, attemptId
|""".stripMargin
}

def getFailedJobs: String = {
s"""select jobID, jobResult,
|substr(failedReason, 1, 100) as failedReason_first100char
|from jobDF_$index
|where jobResult <> 'JobSucceeded'
|order by jobID
|""".stripMargin
}

def getblockManagersRemoved: String = {
s"""select executorID, time
|from blockManagersRemovedDF_$index
|order by cast(executorID as long)
|""".stripMargin
}

def getExecutorsRemoved: String = {
s"""select executorID, time,
|substr(reason, 1, 32) reason_first32char
|from executorsRemovedDF_$index
|order by cast(executorID as long)
|""".stripMargin
}

def unsupportedSQLPlan: String = {
s"""select sqlID, nodeID, nodeName,
|substr(nodeDesc, 1, 100) nodeDesc_first100char
|from unsupportedSQLplan_$index""".stripMargin
}

def qualificationDurationNoMetricsSQL: String = {
s"""select
|first(appName) as `App Name`,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
executorID,time,reason_first32char
2,1617037387190,Remote RPC client disassociated.
7,1617037412432,Remote RPC client disassociated.
8,1617037438591,Remote RPC client disassociated.
9,1617037462939,Remote RPC client disassociated.
10,1617037488244,Remote RPC client disassociated.
11,1617037515083,Remote RPC client disassociated.
12,1617037538848,Remote RPC client disassociated.
13,1617037564194,Remote RPC client disassociated.
14,1617037590247,Remote RPC client disassociated.
15,1617037615001,Remote RPC client disassociated.
16,1617037640402,Remote RPC client disassociated.
17,1617037666807,Remote RPC client disassociated.
18,1617037691151,Remote RPC client disassociated.
19,1617037716507,Remote RPC client disassociated.
20,1617037741911,Remote RPC client disassociated.
21,1617037768953,Remote RPC client disassociated.
22,1617037792863,Remote RPC client disassociated.
23,1617037818568,Remote RPC client disassociated.
24,1617037843567,Remote RPC client disassociated.
25,1617037868886,Remote RPC client disassociated.
26,1617037895173,Remote RPC client disassociated.
27,1617037919270,Remote RPC client disassociated.
28,1617037944623,Remote RPC client disassociated.
29,1617037972091,Remote RPC client disassociated.
30,1617037996433,Remote RPC client disassociated.
31,1617038021679,Remote RPC client disassociated.
32,1617038048179,Remote RPC client disassociated.
33,1617038072334,Remote RPC client disassociated.
34,1617038097524,Remote RPC client disassociated.
35,1617038123139,Remote RPC client disassociated.
36,1617038148203,Remote RPC client disassociated.
37,1617038174089,Remote RPC client disassociated.
38,1617038199599,Remote RPC client disassociated.
39,1617038224782,Remote RPC client disassociated.
40,1617038250285,Remote RPC client disassociated.
41,1617038275657,Remote RPC client disassociated.
42,1617038300960,Remote RPC client disassociated.
43,1617038326300,Remote RPC client disassociated.
44,1617038352154,Remote RPC client disassociated.
45,1617038377198,Remote RPC client disassociated.
46,1617038403660,Remote RPC client disassociated.
47,1617038428202,Remote RPC client disassociated.
48,1617038453205,Remote RPC client disassociated.
49,1617038478566,Remote RPC client disassociated.
50,1617038504957,Remote RPC client disassociated.
51,1617038529208,Remote RPC client disassociated.
52,1617038554615,Remote RPC client disassociated.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
jobID,jobResult,failedReason_first100char
0,JobFailed,java.lang.Exception: Job 0 cancelled as part of cancellation of all jobs

Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
executorID,time
2,1617037387197
7,1617037412433
8,1617037438591
9,1617037462940
10,1617037488244
11,1617037515089
12,1617037538848
13,1617037564194
14,1617037590247
15,1617037615002
16,1617037640403
17,1617037666807
18,1617037691152
19,1617037716507
20,1617037741911
21,1617037768953
22,1617037792863
23,1617037818568
24,1617037843568
25,1617037868886
26,1617037895174
27,1617037919271
28,1617037944623
29,1617037972091
30,1617037996433
31,1617038021679
32,1617038048180
33,1617038072335
34,1617038097525
35,1617038123139
36,1617038148204
37,1617038174089
38,1617038199599
39,1617038224782
40,1617038250286
41,1617038275657
42,1617038300960
43,1617038326300
44,1617038352154
45,1617038377198
46,1617038403661
47,1617038428202
48,1617038453205
49,1617038478566
50,1617038504957
51,1617038529208
52,1617038554615

Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
stageId,attemptId,name,numTasks,failureReason_first100char
4,0,attachTree at Spark300Shims.scala:624,1000,Job 0 cancelled as part of cancellation of all jobs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
stageId,stageAttemptId,taskId,attempt,endReason_first36char
4,0,2842,0,ExceptionFailure(ai.rapids.cudf.Cudf
4,0,2858,0,TaskKilled(another attempt succeeded
4,0,2884,0,TaskKilled(another attempt succeeded
4,0,2908,0,TaskKilled(another attempt succeeded
4,0,3410,1,ExceptionFailure(ai.rapids.cudf.Cudf
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
sqlID,nodeID,nodeName,nodeDesc
0,3,MapElements,MapElements com.nvidia.spark.rapids.tool.profiling.QualificationInfoSuite$$$Lambda$1571/993650587@7b
0,4,Filter,Filter com.nvidia.spark.rapids.tool.profiling.QualificationInfoSuite$$$Lambda$1569/1828787392@2eb6d3
34,601 changes: 34,601 additions & 0 deletions tools/src/test/resources/spark-events-profiling/executors_removed_eventlog

Large diffs are not rendered by default.

6,894 changes: 6,894 additions & 0 deletions tools/src/test/resources/spark-events-profiling/task_job_failure_eventlog

Large diffs are not rendered by default.

Loading

0 comments on commit 1229894

Please sign in to comment.