Skip to content

Commit

Permalink
Fix comparing spark2 and spark3 event logs with executor info missing (
Browse files Browse the repository at this point in the history
…#2932)

resource profile id column

Signed-off-by: Thomas Graves <tgraves@nvidia.com>
  • Loading branch information
tgravescs authored Jul 14, 2021
1 parent cc556b6 commit 4e5094b
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer
import com.nvidia.spark.rapids.tool.ToolTextFileWriter

import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.rapids.tool.profiling.{ApplicationInfo, SparkPlanInfoWithStage}
import org.apache.spark.sql.types.{StringType, StructField, StructType}

Expand Down Expand Up @@ -186,7 +186,7 @@ class CompareApplications(apps: Seq[ApplicationInfo],
}

// Compare Executors information
def compareExecutorInfo(): Unit = {
def compareExecutorInfo(): DataFrame = {
val messageHeader = "\n\nCompare Executor Information:\n"
var query = ""
var i = 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,7 @@ class ApplicationInfo(
!allDataFrames.contains(s"resourceProfilesDF_$index")) {

s"""select $index as appIndex,
|null as resourceProfileId,
|t.numExecutors, t.totalCores as executorCores,
|bm.maxMem, bm.maxOnHeapMem, bm.maxOffHeapMem,
|null as executorMemory, null as numGpusPerExecutor,
Expand Down Expand Up @@ -744,6 +745,7 @@ class ApplicationInfo(
|""".stripMargin
} else {
s"""select $index as appIndex,
|null as resourceProfileId,
|count(executorID) as numExecutors,
|first(totalCores) as executorCores,
|null as maxMem, null as maxOnHeapMem, null as maxOffHeapMem,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,32 @@ class ApplicationInfoSuite extends FunSuite with Logging {
assert(row1.executorCores.equals(2))
}

test("test spark2 and spark3 event logs") {
var apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]()
val appArgs = new ProfileArgs(Array(s"$logDir/tasks_executors_fail_compressed_eventlog.zstd",
s"$logDir/spark2-eventlog.zstd"))
var index: Int = 1
val eventlogPaths = appArgs.eventlog()
for (path <- eventlogPaths) {
apps += new ApplicationInfo(appArgs.numOutputRows.getOrElse(1000), sparkSession,
EventLogPathProcessor.getEventLogInfo(path,
sparkSession.sparkContext.hadoopConfiguration).head._1, index)
index += 1
}
assert(apps.size == 2)
val compare = new CompareApplications(apps, None)
val df = compare.compareExecutorInfo()
// just the fact it worked makes sure we can run with both files
val execinfo = df.collect()
// since we give them indexes above they should be in the right order
// and spark2 event info should be second
val firstRow = execinfo.head
assert(firstRow.getInt(firstRow.schema.fieldIndex("resourceProfileId")) === 0)

val secondRow = execinfo(1)
assert(secondRow.isNullAt(secondRow.schema.fieldIndex("resourceProfileId")))
}

test("test filename match") {
val matchFileName = "udf"
val appArgs = new ProfileArgs(Array(
Expand Down

0 comments on commit 4e5094b

Please sign in to comment.