Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Profiling tool: Add support for health check. #2632

Merged
merged 3 commits into from
Jun 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions tools/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,35 @@ Shuffle Skew Check: (When task's Shuffle Read Size > 3 * Avg Stage-level size)
|1 |2 |0 |2224 |1 |222.22 |8.8 |3333.33 |111.11 |0.01 |false |ExceptionFailure(ai.rapids.cudf.CudfException,cuDF failure at: /dddd/xxxxxxx/ccccc/bbbbbbbbb/aaaaaaa|
+--------+-------+--------------+------+-------+---------------+--------------+-----------------+----------------+----------------+----------+----------------------------------------------------------------------------------------------------+
```
#### C. Health Check

```
Application application_1616746343401_0025 (index=1) failed tasks:
+-------+--------------+------+-------+------------------------------------+
|stageId|stageAttemptId|taskId|attempt|endReason_first36char |
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason to use 36 char here and 100 below? seems like we could use 100 in both. Maybe put ... at the end of hte string if its longer.

+-------+--------------+------+-------+------------------------------------+
|4 |0 |2842 |0 |ExceptionFailure(ai.rapids.cudf.Cudf|
|4 |0 |2858 |0 |TaskKilled(another attempt succeeded|
|4 |0 |2884 |0 |TaskKilled(another attempt succeeded|
|4 |0 |2908 |0 |TaskKilled(another attempt succeeded|
|4 |0 |3410 |1 |ExceptionFailure(ai.rapids.cudf.Cudf|
+-------+--------------+------+-------+------------------------------------+

Application application_1616746343401_0025 (index=1) failed stages:
+-------+---------+-------------------------------------+--------+---------------------------------------------------+
|stageId|attemptId|name |numTasks|failureReason_first100char |
+-------+---------+-------------------------------------+--------+---------------------------------------------------+
|4 |0 |attachTree at Spark300Shims.scala:624|1000 |Job 0 cancelled as part of cancellation of all jobs|
+-------+---------+-------------------------------------+--------+---------------------------------------------------+

Application application_1616746343401_0025 (index=1) failed jobs:
+-----+---------+------------------------------------------------------------------------+
|jobID|jobResult|failedReason_first100char |
+-----+---------+------------------------------------------------------------------------+
|0 |JobFailed|java.lang.Exception: Job 0 cancelled as part of cancellation of all jobs|
+-----+---------+------------------------------------------------------------------------+
```


### How to use this tool
This tool parses the Spark CPU or GPU event log(s) and creates an output report.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,5 @@ case class TaskCase(
case class DatasetSQLCase(sqlID: Long)

case class ProblematicSQLCase(sqlID: Long, reason: String)

case class UnsupportedSQLPlan(sqlID: Long, nodeID: Long, nodeName: String, nodeDesc: String)
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.profiling

import com.nvidia.spark.rapids.tool.ToolTextFileWriter
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo

/**
* HealthCheck defined health check rules
*/
class HealthCheck(apps: ArrayBuffer[ApplicationInfo], textFileWriter:ToolTextFileWriter){
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit space after :


require(apps.nonEmpty)

// Function to list all failed tasks , stages and jobs.
def listFailedJobsStagesTasks(): Unit = {
for (app <- apps) {
// Look for failed tasks.
val tasksMessageHeader = s"Application ${app.appId} (index=${app.index}) failed tasks:\n"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the rest of the headers leave off the appid and app index, we should be consistent about that.

app.runQuery(query = app.getFailedTasks, fileWriter = Some(textFileWriter),
messageHeader = tasksMessageHeader)

// Look for failed stages.
val stagesMessageHeader = s"Application ${app.appId} (index=${app.index}) failed stages:\n"
app.runQuery(query = app.getFailedStages, fileWriter = Some(textFileWriter),
messageHeader = stagesMessageHeader)

// Look for failed jobs.
val jobsMessageHeader = s"Application ${app.appId} (index=${app.index}) failed jobs:\n"
app.runQuery(query = app.getFailedJobs, fileWriter = Some(textFileWriter),
messageHeader = jobsMessageHeader)
}
}

//Function to list all SparkListenerBlockManagerRemoved
def listRemovedBlockManager(): Unit = {
for (app <- apps) {
if (app.allDataFrames.contains(s"blockManagersRemovedDF_${app.index}")) {
val blockManagersMessageHeader =
s"Application ${app.appId} (index=${app.index}) removed BlockManager(s):\n"
app.runQuery(query = app.getblockManagersRemoved, fileWriter = Some(textFileWriter),
messageHeader = blockManagersMessageHeader)
}
}
}

//Function to list all SparkListenerExecutorRemoved
def listRemovedExecutors(): Unit = {
for (app <- apps) {
if (app.allDataFrames.contains(s"executorsRemovedDF_${app.index}")) {
val executorsRemovedMessageHeader =
s"Application ${app.appId} (index=${app.index}) removed Executors(s):\n"
app.runQuery(query = app.getExecutorsRemoved, fileWriter = Some(textFileWriter),
messageHeader = executorsRemovedMessageHeader)
}
}
}

//Function to list all *possible* not-supported plan nodes if GPU Mode=on
def listPossibleUnsupportedSQLPlan(): Unit = {
textFileWriter.write("\nSQL Plan HealthCheck: Not supported SQL Plan\n")
for (app <- apps) {
if (app.allDataFrames.contains(s"sqlDF_${app.index}") && app.sqlPlan.nonEmpty) {
app.runQuery(query = app.unsupportedSQLPlan, fileWriter = Some(textFileWriter),
messageHeader = s"Application ${app.appId} (index=${app.index})\n")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,13 @@ object ProfileMain extends Logging {
sqlAggMetricsDF.createOrReplaceTempView("sqlAggMetricsDF")
analysis.sqlMetricsAggregationDurationAndCpuTime()
analysis.shuffleSkewCheck()

textFileWriter.write("\n### C. Health Check###\n")
val healthCheck=new HealthCheck(apps, textFileWriter)
healthCheck.listFailedJobsStagesTasks()
healthCheck.listRemovedBlockManager()
healthCheck.listRemovedExecutors()
healthCheck.listPossibleUnsupportedSQLPlan()
}

def logApplicationInfo(app: ApplicationInfo) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ class ApplicationInfo(
var taskGettingResult: ArrayBuffer[SparkListenerTaskGettingResult] =
ArrayBuffer[SparkListenerTaskGettingResult]()

//Unsupported SQL plan
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space after //

var unsupportedSQLplan: ArrayBuffer[UnsupportedSQLPlan] = ArrayBuffer[UnsupportedSQLPlan]()

// From all other events
var otherEvents: ArrayBuffer[SparkListenerEvent] = ArrayBuffer[SparkListenerEvent]()

Expand Down Expand Up @@ -281,9 +284,13 @@ class ApplicationInfo(
// SQLPlanMetric is a case Class of
// (name: String,accumulatorId: Long,metricType: String)
val allnodes = planGraph.allNodes
for (node <- allnodes){
for (node <- allnodes) {
if (isDataSetPlan(node.desc)) {
datasetSQL += DatasetSQLCase(sqlID)
if (gpuMode) {
val thisPlan = UnsupportedSQLPlan(sqlID, node.id, node.name, node.desc)
unsupportedSQLplan += thisPlan
}
}
// Then process SQL plan metric type
for (metric <- node.metrics){
Expand Down Expand Up @@ -513,6 +520,15 @@ class ApplicationInfo(
} else {
logInfo("No Plan node accums Found. Create an empty Plan node accums DataFrame.")
}

// For unsupportedSQLPlanDF
allDataFrames += (s"unsupportedSQLplan_$index" -> unsupportedSQLplan.toDF)
if (unsupportedSQLplan.nonEmpty) {
logInfo(s"Total ${unsupportedSQLplan.size} Plan node accums for appID=$appId")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Plan node accums" is copy and paste from above, change it to something like unsupported ops

} else {
logInfo("No unSupportedSQLPlan node accums Found. " +
"Create an empty node accums DataFrame.")
}
}

for ((name, df) <- this.allDataFrames) {
Expand Down Expand Up @@ -730,6 +746,54 @@ class ApplicationInfo(
|""".stripMargin
}

def getFailedTasks: String = {
s"""select stageId, stageAttemptId, taskId, attempt,
|substr(endReason, 1, 36) as endReason_first36char
|from taskDF_$index
|where successful = false
|order by stageId, stageAttemptId, taskId, attempt
|""".stripMargin
}

def getFailedStages: String = {
s"""select stageId, attemptId, name, numTasks,
|substr(failureReason, 1, 100) as failureReason_first100char
|from stageDF_$index
|where failureReason is not null
|order by stageId, attemptId
|""".stripMargin
}

def getFailedJobs: String = {
s"""select jobID, jobResult,
|substr(failedReason, 1, 100) as failedReason_first100char
|from jobDF_$index
|where jobResult <> 'JobSucceeded'
|order by jobID
|""".stripMargin
}

def getblockManagersRemoved: String = {
s"""select executorID, time
|from blockManagersRemovedDF_$index
|order by cast(executorID as long)
|""".stripMargin
}

def getExecutorsRemoved: String = {
s"""select executorID, time,
|substr(reason, 1, 32) reason_first32char
|from executorsRemovedDF_$index
|order by cast(executorID as long)
|""".stripMargin
}

def unsupportedSQLPlan: String = {
s"""select sqlID, nodeID, nodeName,
|substr(nodeDesc, 1, 100) nodeDesc_first100char
|from unsupportedSQLplan_$index""".stripMargin
}

def qualificationDurationNoMetricsSQL: String = {
s"""select
|first(appName) as `App Name`,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
executorID,time,reason_first32char
2,1617037387190,Remote RPC client disassociated.
7,1617037412432,Remote RPC client disassociated.
8,1617037438591,Remote RPC client disassociated.
9,1617037462939,Remote RPC client disassociated.
10,1617037488244,Remote RPC client disassociated.
11,1617037515083,Remote RPC client disassociated.
12,1617037538848,Remote RPC client disassociated.
13,1617037564194,Remote RPC client disassociated.
14,1617037590247,Remote RPC client disassociated.
15,1617037615001,Remote RPC client disassociated.
16,1617037640402,Remote RPC client disassociated.
17,1617037666807,Remote RPC client disassociated.
18,1617037691151,Remote RPC client disassociated.
19,1617037716507,Remote RPC client disassociated.
20,1617037741911,Remote RPC client disassociated.
21,1617037768953,Remote RPC client disassociated.
22,1617037792863,Remote RPC client disassociated.
23,1617037818568,Remote RPC client disassociated.
24,1617037843567,Remote RPC client disassociated.
25,1617037868886,Remote RPC client disassociated.
26,1617037895173,Remote RPC client disassociated.
27,1617037919270,Remote RPC client disassociated.
28,1617037944623,Remote RPC client disassociated.
29,1617037972091,Remote RPC client disassociated.
30,1617037996433,Remote RPC client disassociated.
31,1617038021679,Remote RPC client disassociated.
32,1617038048179,Remote RPC client disassociated.
33,1617038072334,Remote RPC client disassociated.
34,1617038097524,Remote RPC client disassociated.
35,1617038123139,Remote RPC client disassociated.
36,1617038148203,Remote RPC client disassociated.
37,1617038174089,Remote RPC client disassociated.
38,1617038199599,Remote RPC client disassociated.
39,1617038224782,Remote RPC client disassociated.
40,1617038250285,Remote RPC client disassociated.
41,1617038275657,Remote RPC client disassociated.
42,1617038300960,Remote RPC client disassociated.
43,1617038326300,Remote RPC client disassociated.
44,1617038352154,Remote RPC client disassociated.
45,1617038377198,Remote RPC client disassociated.
46,1617038403660,Remote RPC client disassociated.
47,1617038428202,Remote RPC client disassociated.
48,1617038453205,Remote RPC client disassociated.
49,1617038478566,Remote RPC client disassociated.
50,1617038504957,Remote RPC client disassociated.
51,1617038529208,Remote RPC client disassociated.
52,1617038554615,Remote RPC client disassociated.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
jobID,jobResult,failedReason_first100char
0,JobFailed,java.lang.Exception: Job 0 cancelled as part of cancellation of all jobs

Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
executorID,time
2,1617037387197
7,1617037412433
8,1617037438591
9,1617037462940
10,1617037488244
11,1617037515089
12,1617037538848
13,1617037564194
14,1617037590247
15,1617037615002
16,1617037640403
17,1617037666807
18,1617037691152
19,1617037716507
20,1617037741911
21,1617037768953
22,1617037792863
23,1617037818568
24,1617037843568
25,1617037868886
26,1617037895174
27,1617037919271
28,1617037944623
29,1617037972091
30,1617037996433
31,1617038021679
32,1617038048180
33,1617038072335
34,1617038097525
35,1617038123139
36,1617038148204
37,1617038174089
38,1617038199599
39,1617038224782
40,1617038250286
41,1617038275657
42,1617038300960
43,1617038326300
44,1617038352154
45,1617038377198
46,1617038403661
47,1617038428202
48,1617038453205
49,1617038478566
50,1617038504957
51,1617038529208
52,1617038554615

Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
stageId,attemptId,name,numTasks,failureReason_first100char
4,0,attachTree at Spark300Shims.scala:624,1000,Job 0 cancelled as part of cancellation of all jobs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
stageId,stageAttemptId,taskId,attempt,endReason_first36char
4,0,2842,0,ExceptionFailure(ai.rapids.cudf.Cudf
4,0,2858,0,TaskKilled(another attempt succeeded
4,0,2884,0,TaskKilled(another attempt succeeded
4,0,2908,0,TaskKilled(another attempt succeeded
4,0,3410,1,ExceptionFailure(ai.rapids.cudf.Cudf
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
sqlID,nodeID,nodeName,nodeDesc
0,3,MapElements,MapElements com.nvidia.spark.rapids.tool.profiling.QualificationInfoSuite$$$Lambda$1571/993650587@7b
0,4,Filter,Filter com.nvidia.spark.rapids.tool.profiling.QualificationInfoSuite$$$Lambda$1569/1828787392@2eb6d3
34,601 changes: 34,601 additions & 0 deletions tools/src/test/resources/spark-events-profiling/executors_removed_eventlog

Large diffs are not rendered by default.

6,894 changes: 6,894 additions & 0 deletions tools/src/test/resources/spark-events-profiling/task_job_failure_eventlog

Large diffs are not rendered by default.

Loading