Skip to content

Commit

Permalink
Profile tool: fix printing of task failed reason (#4848)
Browse files Browse the repository at this point in the history
* Fix parsing task failed reason

Signed-off-by: Thomas Graves <tgraves@nvidia.com>

* Update tests

* fix extra newline

* Update tools/src/test/scala/com/nvidia/spark/rapids/tool/profiling/HealthCheckSuite.scala

Co-authored-by: Jason Lowe <jlowe@nvidia.com>

Co-authored-by: Jason Lowe <jlowe@nvidia.com>
  • Loading branch information
tgravescs and jlowe authored Feb 23, 2022
1 parent 7a87978 commit 6e29a43
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import org.apache.commons.lang3.StringUtils
class ProfileOutputWriter(outputDir: String, filePrefix: String, numOutputRows: Int,
outputCSV: Boolean = false) {

private val CSVDelimiter = ","

private val textFileWriter = new ToolTextFileWriter(outputDir,
s"$filePrefix.log", "Profile summary")

Expand Down Expand Up @@ -64,13 +62,14 @@ class ProfileOutputWriter(outputDir: String, filePrefix: String, numOutputRows:
val csvWriter = new ToolTextFileWriter(outputDir,
s"${suffix}.csv", s"$header CSV:")
try {
val headerString = outRows.head.outputHeaders.mkString(CSVDelimiter)
val headerString = outRows.head.outputHeaders.mkString(ProfileOutputWriter.CSVDelimiter)
csvWriter.write(headerString + "\n")
val rows = outRows.map(_.convertToSeq)
rows.foreach { row =>
val delimiterHandledRow = row.map(ProfileUtils.replaceDelimiter(_, CSVDelimiter))
val delimiterHandledRow =
row.map(ProfileUtils.replaceDelimiter(_, ProfileOutputWriter.CSVDelimiter))
val formattedRow = delimiterHandledRow.map(stringIfempty(_))
val outStr = formattedRow.mkString(CSVDelimiter)
val outStr = formattedRow.mkString(ProfileOutputWriter.CSVDelimiter)
csvWriter.write(outStr + "\n")
}
} finally {
Expand All @@ -92,6 +91,7 @@ class ProfileOutputWriter(outputDir: String, filePrefix: String, numOutputRows:
}

object ProfileOutputWriter {
val CSVDelimiter = ","

/**
* Regular expression matching full width characters.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.mutable.ArrayBuffer

import com.nvidia.spark.rapids.tool.profiling._

import org.apache.spark.TaskFailedReason
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler._
import org.apache.spark.sql.execution.ui.{SparkListenerDriverAccumUpdates, SparkListenerSQLAdaptiveExecutionUpdate, SparkListenerSQLAdaptiveSQLMetricUpdates, SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart}
Expand Down Expand Up @@ -209,12 +210,18 @@ class EventsProcessor(app: ApplicationInfo) extends EventProcessorBase[Applicati
+ res.name + ",value=" + res.value + ",update=" + res.update)
}
}
val reason = event.reason match {
case failed: TaskFailedReason =>
failed.toErrorString
case _ =>
event.reason.toString
}

val thisTask = TaskCase(
event.stageId,
event.stageAttemptId,
event.taskType,
event.reason.toString,
reason,
event.taskInfo.taskId,
event.taskInfo.attemptNumber,
event.taskInfo.launchTime,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
appIndex,stageId,stageAttemptId,taskId,attempt,failureReason
1,238,0,8519,0,"ExecutorLostFailure(2,true,Some(Executor Process Lost))"
1,238,0,8560,1,"FetchFailed(BlockManagerId(1, hostname-08.domain.com, 46008, None),54,8347,9,72,org.apache.spark.shu"
1,238,0,8574,1,"FetchFailed(BlockManagerId(1, hostname-08.domain.com, 46008, None),54,8339,1,138,org.apache.spark.sh"
1,238,0,8519,0,ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Executor Process
1,238,0,8560,1,FetchFailed(BlockManagerId(1; hostname-08.domain.com; 46008; None); shuffleId=54; mapIndex=9; mapId=
1,238,0,8574,1,FetchFailed(BlockManagerId(1; hostname-08.domain.com; 46008; None); shuffleId=54; mapIndex=1; mapId=
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -58,8 +58,14 @@ class HealthCheckSuite extends FunSuite {
val healthCheck = new HealthCheck(apps)
for (app <- apps) {
val failedTasks = healthCheck.getFailedTasks
// the end reason gets the delimiter changed when writing to CSV so to compare properly
// change it to be the same here
val failedWithDelimiter = failedTasks.map { t =>
val delimited = ProfileUtils.replaceDelimiter(t.endReason, ProfileOutputWriter.CSVDelimiter)
t.copy(endReason = delimited)
}
import sparkSession.implicits._
val taskAccums = failedTasks.toDF
val taskAccums = failedWithDelimiter.toDF
val tasksResultExpectation =
new File(expRoot, "tasks_failure_eventlog_expectation.csv")
val tasksDfExpect =
Expand Down

0 comments on commit 6e29a43

Please sign in to comment.