From 808bcc262d6571fc0b221933321aa59d88439562 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Tue, 24 May 2022 16:50:05 -0500 Subject: [PATCH] Qualification tool: update SQL Df value used and look at jobs in SQL (#5612) * QualificationTool. Add speedup information to AppSummaryInfo Signed-off-by: Ahmed Hussein (amahussein) * address review comments Signed-off-by: Ahmed Hussein (amahussein) * debug * check for dataset * change dataset check for all Signed-off-by: Thomas Graves * test unsupported time * more changes * fix to string * fix including wholestage codegen * put unsupported Dur * calculate duration of non sql stages * change to get stage task time * combine * hooking up final output * initial scores changes * logging * logging * update factor * gturn off some logging: * debug * track execs without stages * Add in exec info output * fix output * add sorting * fix output * fix output sizes * use plan infos without execs removed * output children node ids * output stages info * cleanup * fix running app * fix stage header Signed-off-by: Thomas Graves * Start removing unneeded fields * Update summary table * cleanup * fix sorting * update running app * update test * fix missing * fix more reporting to be based on supported execs * fixes * fix test * fix event processor calling base * fix df duration * debug * debug * fix double and int * fix double * fxi merge * fix double * fix divide 0 * fix double to 2 precision * fix formatting output * fix sorting * fix Running * move around sorting * remove logWarnings * update sorting * Add appId to execs report * update sxtages output * remove unused imports * fix running app * update tests * fix sorting * add estimated into to summary info * fix running * try using enum * fix recommendation to string * update to use recommended/strongly recommended * fix ecommendation * fix divide 0 * fix opportunity * fix up df task duration * debug * fix bug with estimated in csv * rearrange codce * cleanup * cleanup and start handling failures * handle failures and cleanup * fix output * change sorting * fixies * handle test having udf * change speedup of the *InPandas and arrow eval * fix test * fix ui after changing field names Signed-off-by: Ahmed Hussein (amahussein) * make ExecInfo regular class * fix commented out * fix more execinfo * update test schema * change what goes to DF * move to outer class * fix schema * fix limit option * remove extra header csv * match up test with csv * Fix not supported read formats * update results * 2 places for average speedup * update results * update results * comment out tests * update test * update operator scores * cleanup * test sql df times * Update more to use spark reported df duration Signed-off-by: Thomas Graves * fix patch longest sql duration * fix task duration * try to calculate sql overhead * only use stages used in sql * remove some decimal checks * cleanup utils * calculate task time for stages in jobs in sql but not in execs * look at texecs without stages * change to account for sql ids without stage mapping * dedup * fix compil * fix set * update test results * fix merge * fix extra code * update csv exepcted results * move logging * aggreaget Wholestagecodegen and children stages * commonize code * change type to not be seq of set * update latest test results * update dsv2 results * remove logging * typo * Handle case sql duration > app duration * remove unused variable Co-authored-by: Ahmed Hussein (amahussein) --- .../planparser/WholeStageExecParser.scala | 11 +- .../qualification/PluginTypeChecker.scala | 7 +- .../tool/qualification/QualOutputWriter.scala | 15 +- .../spark/sql/rapids/tool/AppBase.scala | 1 + .../sql/rapids/tool/EventProcessorBase.scala | 1 + .../qualification/QualificationAppInfo.scala | 192 ++++++++++++------ .../QualificationEventProcessor.scala | 2 - .../complex_dec_expectation.csv | 2 +- .../db_sim_test_expectation.csv | 2 +- .../directory_test_expectation.csv | 2 +- .../jdbc_expectation.csv | 2 +- .../nds_q86_fail_test_expectation.csv | 2 +- .../nds_q86_test_expectation.csv | 2 +- .../nested_dsv2_expectation.csv | 2 +- .../nested_type_expectation.csv | 2 +- .../qual_test_missing_sql_end_expectation.csv | 2 +- .../qual_test_simple_expectation.csv | 8 +- .../read_dsv1_expectation.csv | 2 +- .../read_dsv2_expectation.csv | 2 +- .../spark2_expectation.csv | 2 +- .../truncated_1_end_expectation.csv | 2 +- .../write_format_expectation.csv | 2 +- .../tool/profiling/ApplicationInfoSuite.scala | 2 +- 23 files changed, 169 insertions(+), 98 deletions(-) diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WholeStageExecParser.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WholeStageExecParser.scala index 03ec2bfb00a..2b6d343bba9 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WholeStageExecParser.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/planparser/WholeStageExecParser.scala @@ -39,17 +39,20 @@ case class WholeStageExecParser( val maxDuration = SQLPlanParser.getTotalDuration(accumId, app) val stagesInNode = SQLPlanParser.getStagesInSQLNode(node, app) - val childNodeRes = node.nodes.flatMap { c => + val childNodes = node.nodes.flatMap { c => SQLPlanParser.parsePlanNode(c, sqlID, checker, app) } // if any of the execs in WholeStageCodegen supported mark this entire thing // as supported - val anySupported = childNodeRes.exists(_.isSupported == true) + val anySupported = childNodes.exists(_.isSupported == true) // average speedup across the execs in the WholeStageCodegen for now - val supportedChildren = childNodeRes.filterNot(_.shouldRemove) + val supportedChildren = childNodes.filterNot(_.shouldRemove) val avSpeedupFactor = SQLPlanParser.averageSpeedup(supportedChildren.map(_.speedupFactor)) + // can't rely on the wholeStagecodeGen having a stage if children do so aggregate them together + // for now + val allStagesIncludingChildren = childNodes.flatMap(_.stages).toSet ++ stagesInNode.toSet val execInfo = new ExecInfo(sqlID, node.name, node.name, avSpeedupFactor, maxDuration, - node.id, anySupported, Some(childNodeRes), stagesInNode) + node.id, anySupported, Some(childNodes), allStagesIncludingChildren.toSeq) Seq(execInfo) } } diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala index 9d4fa2de30a..1687aae38ef 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala @@ -200,12 +200,7 @@ class PluginTypeChecker extends Logging { // check if any of the not supported types are in the schema val nsFiltered = dtSupMap(NS).filter(t => schemaLower.contains(t.toLowerCase())) if (nsFiltered.nonEmpty) { - val deDuped = if (nsFiltered.contains("dec") && nsFiltered.contains("decimal")) { - nsFiltered.filterNot(_.equals("dec")) - } else { - nsFiltered - } - (0.0, deDuped.toSet) + (0.0, nsFiltered.toSet) } else { // Started out giving different weights based on partial support and so forth // but decided to be optimistic and not penalize if we don't know, perhaps diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala index 24a0b6a2903..495e23be31b 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualOutputWriter.scala @@ -75,7 +75,7 @@ class QualOutputWriter(outputDir: String, reportReadSchema: Boolean, printStdout val plans = sums.flatMap(_.planInfo) val allExecs = QualOutputWriter.getAllExecsFromPlan(plans) val headersAndSizes = QualOutputWriter - .getDetailedExecsHeaderStringsAndSizes(sums, allExecs) + .getDetailedExecsHeaderStringsAndSizes(sums, allExecs.toSeq) csvFileWriter.write(QualOutputWriter.constructDetailedHeader(headersAndSizes, ",", false)) sums.foreach { sumInfo => val rows = QualOutputWriter.constructExecsInfo(sumInfo, headersAndSizes, ",", false) @@ -165,6 +165,7 @@ object QualOutputWriter { val ESTIMATED_GPU_DURATION = "Estimated GPU Duration" val ESTIMATED_GPU_SPEEDUP = "Estimated GPU Speedup" val ESTIMATED_GPU_TIMESAVED = "Estimated GPU Time Saved" + val STAGE_ESTIMATED_STR = "Stage Estimated" val APP_DUR_STR_SIZE: Int = APP_DUR_STR.size val SQL_DUR_STR_SIZE: Int = SQL_DUR_STR.size @@ -360,7 +361,8 @@ object QualOutputWriter { STAGE_ID_STR -> STAGE_ID_STR.size, AVERAGE_SPEEDUP_STR -> AVERAGE_SPEEDUP_STR.size, STAGE_DUR_STR -> STAGE_DUR_STR.size, - UNSUPPORTED_TASK_DURATION_STR -> UNSUPPORTED_TASK_DURATION_STR.size + UNSUPPORTED_TASK_DURATION_STR -> UNSUPPORTED_TASK_DURATION_STR.size, + STAGE_ESTIMATED_STR -> STAGE_ESTIMATED_STR.size ) detailedHeadersAndFields } @@ -377,23 +379,24 @@ object QualOutputWriter { info.stageId.toString -> headersAndSizes(STAGE_ID_STR), f"${info.averageSpeedup}%1.2f" -> headersAndSizes(AVERAGE_SPEEDUP_STR), info.stageTaskTime.toString -> headersAndSizes(STAGE_DUR_STR), - info.unsupportedTaskDur.toString -> headersAndSizes(UNSUPPORTED_TASK_DURATION_STR)) + info.unsupportedTaskDur.toString -> headersAndSizes(UNSUPPORTED_TASK_DURATION_STR), + info.estimated.toString -> headersAndSizes(STAGE_ESTIMATED_STR)) constructOutputRow(data, delimiter, prettyPrint) } } - def getAllExecsFromPlan(plans: Seq[PlanInfo]): Seq[ExecInfo] = { + def getAllExecsFromPlan(plans: Seq[PlanInfo]): Set[ExecInfo] = { val topExecInfo = plans.flatMap(_.execInfo) topExecInfo.flatMap { e => e.children.getOrElse(Seq.empty) :+ e - } + }.toSet } def constructExecsInfo( sumInfo: QualificationSummaryInfo, headersAndSizes: LinkedHashMap[String, Int], delimiter: String = "|", - prettyPrint: Boolean): Seq[String] = { + prettyPrint: Boolean): Set[String] = { val allExecs = getAllExecsFromPlan(sumInfo.planInfo) val appId = sumInfo.appId allExecs.flatMap { info => diff --git a/tools/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala b/tools/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala index a7bfcb52bb6..edcb361f831 100644 --- a/tools/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala +++ b/tools/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala @@ -47,6 +47,7 @@ abstract class AppBase( // jobId to job info val jobIdToInfo = new HashMap[Int, JobInfoClass]() + val jobIdToSqlID: HashMap[Int, Long] = HashMap.empty[Int, Long] // SQL containing any Dataset operation or RDD to DataSet/DataFrame operation val sqlIDToDataSetOrRDDCase: HashSet[Long] = HashSet[Long]() diff --git a/tools/src/main/scala/org/apache/spark/sql/rapids/tool/EventProcessorBase.scala b/tools/src/main/scala/org/apache/spark/sql/rapids/tool/EventProcessorBase.scala index 55553f99f03..1d8dfb9139a 100644 --- a/tools/src/main/scala/org/apache/spark/sql/rapids/tool/EventProcessorBase.scala +++ b/tools/src/main/scala/org/apache/spark/sql/rapids/tool/EventProcessorBase.scala @@ -318,6 +318,7 @@ abstract class EventProcessorBase[T <: AppBase](app: T) extends SparkListener wi ProfileUtils.isPluginEnabled(event.properties.asScala) || app.gpuMode ) app.jobIdToInfo.put(event.jobId, thisJob) + sqlID.foreach(app.jobIdToSqlID(event.jobId) = _) } override def onJobStart(jobStart: SparkListenerJobStart): Unit = { diff --git a/tools/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala b/tools/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala index 48e837c0214..aacffb15998 100644 --- a/tools/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala +++ b/tools/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala @@ -39,7 +39,6 @@ class QualificationAppInfo( var appId: String = "" var lastJobEndTime: Option[Long] = None var lastSQLEndTime: Option[Long] = None - var longestSQLDuration: Long = 0 val writeDataFormat: ArrayBuffer[String] = ArrayBuffer[String]() var appInfo: Option[QualApplicationInfo] = None @@ -51,7 +50,6 @@ class QualificationAppInfo( HashMap.empty[Long, StageTaskQualificationSummary] val stageIdToSqlID: HashMap[Int, Long] = HashMap.empty[Int, Long] - val jobIdToSqlID: HashMap[Int, Long] = HashMap.empty[Int, Long] val sqlIDtoFailures: HashMap[Long, ArrayBuffer[String]] = HashMap.empty[Long, ArrayBuffer[String]] val notSupportFormatAndTypes: HashMap[String, Set[String]] = HashMap[String, Set[String]]() @@ -116,14 +114,14 @@ class QualificationAppInfo( var pivot = startTime var overhead : Long = 0 - sortedJobs.foreach(job => { + sortedJobs.foreach { job => val timeDiff = job.startTime - pivot if (timeDiff > 0) { overhead += timeDiff } // if jobEndTime is not set, use job.startTime - pivot = Math max(pivot, job.endTime.getOrElse(job.startTime)) - }) + pivot = Math.max(pivot, job.endTime.getOrElse(job.startTime)) + } overhead } @@ -142,16 +140,16 @@ class QualificationAppInfo( ToolUtils.calculateDurationPercent(totalCpuTime, totalRunTime) } - private def calculateSQLSupportedTaskDuration(all: Seq[Set[StageQualSummaryInfo]]): Long = { - all.flatMap(_.map(s => s.stageTaskTime - s.unsupportedTaskDur)).sum + private def calculateSQLSupportedTaskDuration(all: Seq[StageQualSummaryInfo]): Long = { + all.map(s => s.stageTaskTime - s.unsupportedTaskDur).sum } - private def calculateSQLUnsupportedTaskDuration(all: Seq[Set[StageQualSummaryInfo]]): Long = { - all.flatMap(_.map(_.unsupportedTaskDur)).sum + private def calculateSQLUnsupportedTaskDuration(all: Seq[StageQualSummaryInfo]): Long = { + all.map(_.unsupportedTaskDur).sum } - private def calculateSpeedupFactor(all: Seq[Set[StageQualSummaryInfo]]): Double = { - val allSpeedupFactors = all.flatMap(_.map(_.averageSpeedup)) + private def calculateSpeedupFactor(all: Seq[StageQualSummaryInfo]): Double = { + val allSpeedupFactors = all.filter(_.stageTaskTime > 0).map(_.averageSpeedup) val res = SQLPlanParser.averageSpeedup(allSpeedupFactors) res } @@ -174,49 +172,105 @@ class QualificationAppInfo( } } - private def getStageToExec(execInfos: Seq[ExecInfo]): Map[Int, Seq[ExecInfo]] = { - execInfos.flatMap { execInfo => + private def getStageToExec(execInfos: Seq[ExecInfo]): (Map[Int, Seq[ExecInfo]], Seq[ExecInfo]) = { + val execsWithoutStages = new ArrayBuffer[ExecInfo]() + val perStageSum = execInfos.flatMap { execInfo => if (execInfo.stages.size > 1) { execInfo.stages.map((_, execInfo)) } else if (execInfo.stages.size < 1) { // we don't know what stage its in our its duration logDebug(s"No stage associated with ${execInfo.exec} " + s"so speedup factor isn't applied anywhere.") + execsWithoutStages += execInfo Seq.empty } else { Seq((execInfo.stages.head, execInfo)) } - }.groupBy(_._1).map { case (k, v) => - (k, v.map(_._2)) + }.groupBy(_._1).map { case (stage, execInfos) => + (stage, execInfos.map(_._2)) + } + (perStageSum, execsWithoutStages.toSeq) + } + + private def flattenedExecs(execs: Seq[ExecInfo]): Seq[ExecInfo] = { + // need to remove the WholeStageCodegen wrappers since they aren't actual + // execs that we want to get timings of + execs.flatMap { e => + if (e.exec.contains("WholeStageCodegen")) { + e.children.getOrElse(Seq.empty) + } else { + e.children.getOrElse(Seq.empty) :+ e + } + } + } + + private def getAllStagesForJobsInSqlQuery(sqlID: Long): Seq[Int] = { + val jobsIdsInSQLQuery = jobIdToSqlID.filter { case (_, sqlIdForJob) => + sqlIdForJob == sqlID + }.keys.toSeq + jobsIdsInSQLQuery.flatMap { jId => + jobIdToInfo(jId).stageIds } } - def summarizeStageLevel(allStagesToExecs: Map[Int, Seq[ExecInfo]]): Set[StageQualSummaryInfo] = { - // if it doesn't have a stage id associated we can't calculate the time spent in that - // SQL so we just drop it - val stageIds = allStagesToExecs.keys.toSet - stageIds.map { stageId => + private def stagesSummary(execInfos: Seq[ExecInfo], + stages: Seq[Int], estimated: Boolean): Set[StageQualSummaryInfo] = { + val allStageTaskTime = stages.map { stageId => + stageIdToTaskEndSum.get(stageId).map(_.totalTaskDuration).getOrElse(0L) + }.sum + val allSpeedupFactorAvg = SQLPlanParser.averageSpeedup(execInfos.map(_.speedupFactor)) + val allFlattenedExecs = flattenedExecs(execInfos) + val numUnsupported = allFlattenedExecs.filterNot(_.isSupported) + // if we have unsupported try to guess at how much time. For now divide + // time by number of execs and give each one equal weight + val eachExecTime = allStageTaskTime / allFlattenedExecs.size + val unsupportedDur = eachExecTime * numUnsupported.size + // split unsupported per stage + val eachStageUnsupported = unsupportedDur / stages.size + stages.map { stageId => val stageTaskTime = stageIdToTaskEndSum.get(stageId) .map(_.totalTaskDuration).getOrElse(0L) - val execsForStage = allStagesToExecs.getOrElse(stageId, Seq.empty) - val speedupFactors = execsForStage.map(_.speedupFactor) - val averageSpeedupFactor = SQLPlanParser.averageSpeedup(speedupFactors) - // need to remove the WholeStageCodegen wrappers since they aren't actual - // execs that we want to get timings of - val allFlattenedExecs = execsForStage.flatMap { e => - if (e.exec.contains("WholeStageCodegen")) { - e.children.getOrElse(Seq.empty) + StageQualSummaryInfo(stageId, allSpeedupFactorAvg, stageTaskTime, + eachStageUnsupported, estimated) + }.toSet + } + + def summarizeStageLevel(execInfos: Seq[ExecInfo], sqlID: Long): Set[StageQualSummaryInfo] = { + val (allStagesToExecs, execsNoStage) = getStageToExec(execInfos) + + if (allStagesToExecs.isEmpty) { + // use job level + // also get the job ids associated with the SQLId + val allStagesBasedOnJobs = getAllStagesForJobsInSqlQuery(sqlID) + if (allStagesBasedOnJobs.isEmpty) { + Set.empty + } else { + // we don't know which execs map to each stage so we are going to cheat somewhat and + // apply equally and then just split apart for per stage reporting + stagesSummary(execInfos, allStagesBasedOnJobs, true) + } + } else { + val stageIdsWithExecs = allStagesToExecs.keys.toSet + val allStagesBasedOnJobs = getAllStagesForJobsInSqlQuery(sqlID) + val stagesNotAccounted = allStagesBasedOnJobs.toSet -- stageIdsWithExecs + val stageSummaryNotMapped = if (stagesNotAccounted.nonEmpty) { + if (execsNoStage.nonEmpty) { + stagesSummary(execsNoStage, stagesNotAccounted.toSeq, true) } else { - e.children.getOrElse(Seq.empty) :+ e + // weird case, stages but not associated execs, not sure what to do with this so + // just drop for now + Set.empty } + } else { + Set.empty } - val numUnsupported = allFlattenedExecs.filterNot(_.isSupported) - // if we have unsupported try to guess at how much time. For now divide - // time by number of execs and give each one equal weight - val eachExecTime = stageTaskTime / allFlattenedExecs.size - val unsupportedDur = eachExecTime * numUnsupported.size - - StageQualSummaryInfo(stageId, averageSpeedupFactor, stageTaskTime, unsupportedDur) + // if it doesn't have a stage id associated we can't calculate the time spent in that + // SQL so we just drop it + val stagesFromExecs = stageIdsWithExecs.flatMap { stageId => + val execsForStage = allStagesToExecs.getOrElse(stageId, Seq.empty) + stagesSummary(execsForStage, Seq(stageId), false) + } + stagesFromExecs ++ stageSummaryNotMapped } } @@ -225,28 +279,26 @@ class QualificationAppInfo( val perSQLId = pInfo.execInfo.groupBy(_.sqlID) perSQLId.map { case (sqlID, execInfos) => val sqlWallClockDuration = sqlIdToInfo.get(sqlID).flatMap(_.duration).getOrElse(0L) - // there are issues with duration in whole stage code gen where duration of multiple + // There are issues with duration in whole stage code gen where duration of multiple // execs is more than entire stage time, for now ignore the exec duration and just - // calculate based on average applied to total task time of each stage - val allStagesToExecs = getStageToExec(execInfos) + // calculate based on average applied to total task time of each stage. + // Also note that the below can map multiple stages to the same exec for things like + // shuffle. + // if it doesn't have a stage id associated we can't calculate the time spent in that // SQL so we just drop it - val stageSum = summarizeStageLevel(allStagesToExecs) + val stageSum = summarizeStageLevel(execInfos, sqlID) + val numUnsupportedExecs = execInfos.filterNot(_.isSupported).size // This is a guestimate at how much wall clock was supported val numExecs = execInfos.size.toDouble val numSupportedExecs = (numExecs - numUnsupportedExecs).toDouble val ratio = numSupportedExecs / numExecs - val hackEstimateWallclockSupported = (sqlWallClockDuration * ratio).toInt - if (hackEstimateWallclockSupported > longestSQLDuration) { - longestSQLDuration = hackEstimateWallclockSupported - } - // TODO - do we need to estimate based on supported execs? - // for now just take the time as is + val estimateWallclockSupported = (sqlWallClockDuration * ratio).toInt + // don't worry about supported execs for these are these are mostly indicator of I/O val execRunTime = sqlIDToTaskEndSum.get(sqlID).map(_.executorRunTime).getOrElse(0L) val execCPUTime = sqlIDToTaskEndSum.get(sqlID).map(_.executorCPUTime).getOrElse(0L) - - SQLStageSummary(stageSum, sqlID, hackEstimateWallclockSupported, + SQLStageSummary(stageSum, sqlID, estimateWallclockSupported, execCPUTime, execRunTime) } } @@ -303,18 +355,28 @@ class QualificationAppInfo( // wall clock time val executorCpuTimePercent = calculateCpuTimePercent(perSqlStageSummary) - // TODO - do we want to use this and rely on stages, but some SQL don't have stages - // so this is less than SQL DF real - val sqlDFWallClockDuration = - perSqlStageSummary.map(s => s.hackEstimateWallclockSupported).sum - val allStagesSummary = perSqlStageSummary.map(_.stageSum) - val sqlDataframeTaskDuration = allStagesSummary.flatMap(_.map(s => s.stageTaskTime)).sum + // Using the spark SQL reported duration, this could be a bit off from the + // task times because it relies on the stage times and we might not have + // a stage for every exec + val allSQLDurations = sqlIdToInfo.map { case (_, info) => + info.duration.getOrElse(0L) + } + val sparkSQLDFWallClockDuration = allSQLDurations.sum + val longestSQLDuration = if (allSQLDurations.size > 0) { + allSQLDurations.max + } else { + 0L + } + val allStagesSummary = perSqlStageSummary.flatMap(_.stageSum) + val sqlDataframeTaskDuration = allStagesSummary.map(s => s.stageTaskTime).sum val unsupportedSQLTaskDuration = calculateSQLUnsupportedTaskDuration(allStagesSummary) val endDurationEstimated = this.appEndTime.isEmpty && appDuration > 0 val jobOverheadTime = calculateJobOverHeadTime(info.startTime) val nonSQLDataframeTaskDuration = calculateNonSQLTaskDataframeDuration(sqlDataframeTaskDuration) val nonSQLTaskDuration = nonSQLDataframeTaskDuration + jobOverheadTime + // note that these ratios are based off the stage times which may be missing some stage + // overhead or execs that didn't have associated stages val supportedSQLTaskDuration = calculateSQLSupportedTaskDuration(allStagesSummary) val taskSpeedupFactor = calculateSpeedupFactor(allStagesSummary) @@ -327,14 +389,14 @@ class QualificationAppInfo( val appName = appInfo.map(_.appName).getOrElse("") val estimatedInfo = QualificationAppInfo.calculateEstimatedInfoSummary(estimatedGPURatio, - sqlDFWallClockDuration, appDuration, taskSpeedupFactor, appName, appId, + sparkSQLDFWallClockDuration, appDuration, taskSpeedupFactor, appName, appId, sqlIdsWithFailures.nonEmpty) QualificationSummaryInfo(info.appName, appId, problems, executorCpuTimePercent, endDurationEstimated, sqlIdsWithFailures, notSupportFormatAndTypesString, getAllReadFileFormats, writeFormat, - allComplexTypes, nestedComplexTypes, longestSQLDuration, nonSQLTaskDuration, - sqlDataframeTaskDuration, unsupportedSQLTaskDuration, supportedSQLTaskDuration, + allComplexTypes, nestedComplexTypes, longestSQLDuration, sqlDataframeTaskDuration, + nonSQLTaskDuration, unsupportedSQLTaskDuration, supportedSQLTaskDuration, taskSpeedupFactor, info.sparkUser, info.startTime, origPlanInfos, perSqlStageSummary.map(_.stageSum).flatten, estimatedInfo) } @@ -376,7 +438,7 @@ case class EstimatedSummaryInfo( case class SQLStageSummary( stageSum: Set[StageQualSummaryInfo], sqlID: Long, - hackEstimateWallclockSupported: Long, + estimateWallClockSupported: Long, execCPUTime: Long, execRunTime: Long) @@ -434,7 +496,8 @@ case class StageQualSummaryInfo( stageId: Int, averageSpeedup: Double, stageTaskTime: Long, - unsupportedTaskDur: Long) + unsupportedTaskDur: Long, + estimated: Boolean = false) object Recommendation extends Enumeration { type Recommendation = Value @@ -464,7 +527,14 @@ object QualificationAppInfo extends Logging { def calculateEstimatedInfoSummary(estimatedRatio: Double, sqlDataFrameDuration: Long, appDuration: Long, speedupFactor: Double, appName: String, appId: String, hasFailures: Boolean): EstimatedSummaryInfo = { - val speedupOpportunityWallClock = sqlDataFrameDuration * estimatedRatio + val sqlDataFrameDurationToUse = if (sqlDataFrameDuration > appDuration) { + // our app duration is shorter then our sql duration, estimate the sql duration down + // to app duration + appDuration + } else { + sqlDataFrameDuration + } + val speedupOpportunityWallClock = sqlDataFrameDurationToUse * estimatedRatio val estimated_wall_clock_dur_not_on_gpu = appDuration - speedupOpportunityWallClock val estimated_gpu_duration = (speedupOpportunityWallClock / speedupFactor) + estimated_wall_clock_dur_not_on_gpu @@ -473,7 +543,7 @@ object QualificationAppInfo extends Logging { val recommendation = getRecommendation(estimated_gpu_speedup, hasFailures) EstimatedSummaryInfo(appName, appId, appDuration, - sqlDataFrameDuration, speedupOpportunityWallClock.toLong, + sqlDataFrameDurationToUse, speedupOpportunityWallClock.toLong, estimated_gpu_duration, estimated_gpu_speedup, estimated_gpu_timesaved, recommendation) } diff --git a/tools/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationEventProcessor.scala b/tools/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationEventProcessor.scala index 97889caedef..e245c444060 100644 --- a/tools/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationEventProcessor.scala +++ b/tools/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationEventProcessor.scala @@ -107,7 +107,6 @@ class QualificationEventProcessor(app: QualificationAppInfo) super.doSparkListenerSQLExecutionEnd(app, event) logDebug("Processing event: " + event.getClass) app.lastSQLEndTime = Some(event.time) - val sqlInfo = app.sqlStart.get(event.executionId) // only include duration if it contains no jobs that failed val failures = app.sqlIDtoFailures.get(event.executionId) if (event.executionFailure.isDefined || failures.isDefined) { @@ -130,7 +129,6 @@ class QualificationEventProcessor(app: QualificationAppInfo) event.stageIds.foreach { stageId => app.stageIdToSqlID.getOrElseUpdate(stageId, sqlID) } - app.jobIdToSqlID(event.jobId) = sqlID } } diff --git a/tools/src/test/resources/QualificationExpectations/complex_dec_expectation.csv b/tools/src/test/resources/QualificationExpectations/complex_dec_expectation.csv index 3e4df81efcd..15d82173127 100644 --- a/tools/src/test/resources/QualificationExpectations/complex_dec_expectation.csv +++ b/tools/src/test/resources/QualificationExpectations/complex_dec_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated -Spark shell,local-1626104300434,1616,128847,131104,1616,88.35,"","","",struct;lastname:string>;struct;previous:struct;city:string>>;array>;map;map>;map>;array>;array,struct;lastname:string>;struct;previous:struct;city:string>>;array>;map>;map>;array>,NESTED COMPLEX TYPE,840,1469,0,1469,2.00,false +Spark shell,local-1626104300434,2429,1469,131104,2429,88.35,"","","",struct;lastname:string>;struct;previous:struct;city:string>>;array>;map;map>;map>;array>;array,struct;lastname:string>;struct;previous:struct;city:string>>;array>;map>;map>;array>,NESTED COMPLEX TYPE,1260,128847,0,1469,2.00,false diff --git a/tools/src/test/resources/QualificationExpectations/db_sim_test_expectation.csv b/tools/src/test/resources/QualificationExpectations/db_sim_test_expectation.csv index 776fd317656..b7899ebc041 100644 --- a/tools/src/test/resources/QualificationExpectations/db_sim_test_expectation.csv +++ b/tools/src/test/resources/QualificationExpectations/db_sim_test_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated -Spark shell,local-1623876083964,98102,9399,133857,76168,91.14,"","","","","","",98102,1417661,316964,1100697,3.27,false +Spark shell,local-1623876083964,119903,1417661,133857,93094,91.14,"","","","","","",119903,9399,316964,1100697,2.92,false diff --git a/tools/src/test/resources/QualificationExpectations/directory_test_expectation.csv b/tools/src/test/resources/QualificationExpectations/directory_test_expectation.csv index 776fd317656..b7899ebc041 100644 --- a/tools/src/test/resources/QualificationExpectations/directory_test_expectation.csv +++ b/tools/src/test/resources/QualificationExpectations/directory_test_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated -Spark shell,local-1623876083964,98102,9399,133857,76168,91.14,"","","","","","",98102,1417661,316964,1100697,3.27,false +Spark shell,local-1623876083964,119903,1417661,133857,93094,91.14,"","","","","","",119903,9399,316964,1100697,2.92,false diff --git a/tools/src/test/resources/QualificationExpectations/jdbc_expectation.csv b/tools/src/test/resources/QualificationExpectations/jdbc_expectation.csv index 89ec3cca288..8fa38f3882d 100644 --- a/tools/src/test/resources/QualificationExpectations/jdbc_expectation.csv +++ b/tools/src/test/resources/QualificationExpectations/jdbc_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated -Spark shell,app-20211019113801-0001,2717,545675,571967,2698,28.41,"",JDBC[*],"","","","",1811,18794,127,18667,3.03,false +Spark shell,app-20211019113801-0001,3627,19894,571967,3503,28.41,"",JDBC[*],"","","","",1812,544575,677,19217,2.65,false diff --git a/tools/src/test/resources/QualificationExpectations/nds_q86_fail_test_expectation.csv b/tools/src/test/resources/QualificationExpectations/nds_q86_fail_test_expectation.csv index 8fc58bfefa5..bce681c5582 100644 --- a/tools/src/test/resources/QualificationExpectations/nds_q86_fail_test_expectation.csv +++ b/tools/src/test/resources/QualificationExpectations/nds_q86_fail_test_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated -TPC-DS Like Bench q86,app-20210319163812-1778,9109,3595714,26171,9109,0.0,24,"","","","","",9109,4320658,0,4320658,2.62,false +TPC-DS Like Bench q86,app-20210319163812-1778,9569,4320658,26171,9569,0.0,24,"","","","","",9565,3595714,0,4320658,2.53,false diff --git a/tools/src/test/resources/QualificationExpectations/nds_q86_test_expectation.csv b/tools/src/test/resources/QualificationExpectations/nds_q86_test_expectation.csv index 1c70fbb4930..2028dafca9e 100644 --- a/tools/src/test/resources/QualificationExpectations/nds_q86_test_expectation.csv +++ b/tools/src/test/resources/QualificationExpectations/nds_q86_test_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated -TPC-DS Like Bench q86,app-20210319163812-1778,9109,3595714,26171,9109,35.34,"","","","","","",9109,4320658,0,4320658,2.62,false +TPC-DS Like Bench q86,app-20210319163812-1778,9569,4320658,26171,9569,35.34,"","","","","","",9565,3595714,0,4320658,2.53,false diff --git a/tools/src/test/resources/QualificationExpectations/nested_dsv2_expectation.csv b/tools/src/test/resources/QualificationExpectations/nested_dsv2_expectation.csv index 77ae82f8224..204805bd2dc 100644 --- a/tools/src/test/resources/QualificationExpectations/nested_dsv2_expectation.csv +++ b/tools/src/test/resources/QualificationExpectations/nested_dsv2_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated -Spark shell,local-1630045673160,1294,17979,21200,1294,34.56,"","","",array>;map>;map>;map>;map>;map>,array>;map>,NESTED COMPLEX TYPE,1453,6475,0,6475,2.00,false +Spark shell,local-1629446106683,1453,6475,17698,1453,27.76,"","","",array>;map>,array>;map>,NESTED COMPLEX TYPE,1453,11904,0,6475,2.00,false diff --git a/tools/src/test/resources/QualificationExpectations/qual_test_missing_sql_end_expectation.csv b/tools/src/test/resources/QualificationExpectations/qual_test_missing_sql_end_expectation.csv index a7a2e3f5d1b..7db16c126ca 100644 --- a/tools/src/test/resources/QualificationExpectations/qual_test_missing_sql_end_expectation.csv +++ b/tools/src/test/resources/QualificationExpectations/qual_test_missing_sql_end_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated -Rapids Spark Profiling Tool Unit Tests,local-1622561780883,0,4003,7673,0,55.94,"","","","","","",0,40448,8096,32352,3.27,false +Rapids Spark Profiling Tool Unit Tests,local-1622561780883,0,40448,7673,0,55.94,"","","","","","",0,4003,8096,32352,2.92,false diff --git a/tools/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv b/tools/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv index f358112aa47..4c86c75b181 100644 --- a/tools/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv +++ b/tools/src/test/resources/QualificationExpectations/qual_test_simple_expectation.csv @@ -1,5 +1,5 @@ App Name,App ID,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated -Rapids Spark Profiling Tool Unit Tests,local-1622043423018,9538,4717,16319,8123,37.7,"","",JSON,"","","",5844,132257,19616,112641,3.06,false -Spark shell,local-1651187225439,253,343411,355637,116,87.88,"",JSON[string:bigint:int],"","","","",166,180,97,83,1.37,false -Spark shell,local-1651188809790,65,133608,166215,3,81.18,"",JSON[string:bigint:int],"","","","",65,283,269,14,1.50,false -Rapids Spark Profiling Tool Unit Tests,local-1623281204390,0,5793,6240,0,46.27,"",JSON[string:bigint:int],JSON,"","","",0,4666,4664,2,1.25,false +Rapids Spark Profiling Tool Unit Tests,local-1622043423018,12434,132257,16319,10589,37.7,"","",JSON,"","","",7143,4717,19616,112641,2.79,false +Spark shell,local-1651187225439,760,180,355637,350,87.88,"",JSON[string:bigint:int],"","","","",498,343411,97,83,1.37,false +Spark shell,local-1651188809790,911,283,166215,45,81.18,"",JSON[string:bigint:int],"","","","",715,133608,269,14,1.50,false +Rapids Spark Profiling Tool Unit Tests,local-1623281204390,2032,4666,6240,0,46.27,"",JSON[string:bigint:int],JSON,"","","",1209,5793,4664,2,1.25,false diff --git a/tools/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv b/tools/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv index d52b7c189e9..2feccc338bb 100644 --- a/tools/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv +++ b/tools/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated -Spark shell,local-1624371544219,4107,175857,175293,594,72.15,"",JSON[string:double:date:int:bigint];Text[*],JSON,"","","",1859,20421,17464,2957,1.57,false +Spark shell,local-1624371544219,6695,20421,175293,1034,72.15,"",JSON[string:double:date:int:bigint];Text[*],JSON,"","","",1859,175857,17266,3155,1.67,false diff --git a/tools/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv b/tools/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv index 9db7d6d2e1d..674b4a63cf5 100644 --- a/tools/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv +++ b/tools/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated -Spark shell,local-1624371906627,4644,82505,83738,627,71.3,"",Text[*];json[double],JSON,"","","",1984,21802,18855,2947,1.57,false +Spark shell,local-1624371906627,6760,21802,83738,971,71.3,"",Text[*];json[double],JSON,"","","",1984,82505,18668,3134,1.67,false diff --git a/tools/src/test/resources/QualificationExpectations/spark2_expectation.csv b/tools/src/test/resources/QualificationExpectations/spark2_expectation.csv index 6187ba945d7..574695d0743 100644 --- a/tools/src/test/resources/QualificationExpectations/spark2_expectation.csv +++ b/tools/src/test/resources/QualificationExpectations/spark2_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated -Spark shell,local-1634253215009,881,45177,47063,881,67.64,"",Text[*],"","","","",534,117,0,117,2.24,false +Spark shell,local-1634253215009,1520,359,47063,1011,67.64,"",Text[*],"","","","",1068,44935,120,239,2.06,false diff --git a/tools/src/test/resources/QualificationExpectations/truncated_1_end_expectation.csv b/tools/src/test/resources/QualificationExpectations/truncated_1_end_expectation.csv index 6ed80294489..84b9e2a5ecb 100644 --- a/tools/src/test/resources/QualificationExpectations/truncated_1_end_expectation.csv +++ b/tools/src/test/resources/QualificationExpectations/truncated_1_end_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated -Rapids Spark Profiling Tool Unit Tests,local-1622043423018,435,11608,4872,62,62.67,"","",JSON,"","","",435,7222,6186,1036,1.20,true +Rapids Spark Profiling Tool Unit Tests,local-1622043423018,1306,14353,4872,570,62.67,"","",JSON,"","","",1306,4477,8086,6267,2.61,true diff --git a/tools/src/test/resources/QualificationExpectations/write_format_expectation.csv b/tools/src/test/resources/QualificationExpectations/write_format_expectation.csv index e6a7f720a46..fa28fed253a 100644 --- a/tools/src/test/resources/QualificationExpectations/write_format_expectation.csv +++ b/tools/src/test/resources/QualificationExpectations/write_format_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated -Spark shell,local-1629442299891,854,16325,19554,854,91.72,"","",CSV;JSON,"","","",411,920,0,920,1.50,false +Spark shell,local-1629442299891,1992,920,19554,1992,91.72,"","",CSV;JSON,"","","",1235,16325,0,920,1.50,false diff --git a/tools/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala b/tools/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala index 5f6d5864049..f63934ca5a2 100644 --- a/tools/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala +++ b/tools/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationInfoSuite.scala @@ -111,7 +111,7 @@ class ApplicationInfoSuite extends FunSuite with Logging { test("test rapids jar") { var apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]() val appArgs = - new ProfileArgs(Array(s"$logDir//rapids_join_eventlog.zstd")) + new ProfileArgs(Array(s"$logDir/rapids_join_eventlog.zstd")) var index: Int = 1 val eventlogPaths = appArgs.eventlog() for (path <- eventlogPaths) {