Skip to content

Commit

Permalink
Qualification tool: update SQL Df value used and look at jobs in SQL (#…
Browse files Browse the repository at this point in the history
…5612)

* QualificationTool. Add speedup information to AppSummaryInfo

Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>

* address review comments

Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>

* debug

* check for dataset

* change dataset check for all

Signed-off-by: Thomas Graves <tgraves@apache.org>

* test unsupported time

* more changes

* fix to string

* fix including wholestage codegen

* put unsupported Dur

* calculate duration of non sql stages

* change to get stage task time

* combine

* hooking up final output

* initial scores changes

* logging

* logging

* update factor

* gturn off some logging:

* debug

* track execs without stages

* Add in exec info output

* fix output

* add sorting

* fix output

* fix output sizes

* use plan infos without execs removed

* output children node ids

* output stages info

* cleanup

* fix running app

* fix stage header

Signed-off-by: Thomas Graves <tgraves@apache.org>

* Start removing unneeded fields

* Update summary table

* cleanup

* fix sorting

* update running app

* update test

* fix missing

* fix more reporting to be based on supported execs

* fixes

* fix test

* fix event processor calling base

* fix df duration

* debug

* debug

* fix double and int

* fix double

* fxi merge

* fix double

* fix divide 0

* fix double to 2 precision

* fix formatting output

* fix sorting

* fix Running

* move around sorting

* remove logWarnings

* update sorting

* Add appId to execs report

* update sxtages output

* remove unused imports

* fix running app

* update tests

* fix sorting

* add estimated into to summary info

* fix running

* try using enum

* fix recommendation to string

* update to use recommended/strongly recommended

* fix ecommendation

* fix divide 0

* fix opportunity

* fix up df task duration

* debug

* fix bug with estimated in csv

* rearrange codce

* cleanup

* cleanup and start handling failures

* handle failures and cleanup

* fix output

* change sorting

* fixies

* handle test having udf

* change speedup of the *InPandas and arrow eval

* fix test

* fix ui after changing field names

Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>

* make ExecInfo regular class

* fix commented out

* fix more execinfo

* update test schema

* change what goes to DF

* move to outer class

* fix schema

* fix limit option

* remove extra header csv

* match up test with csv

* Fix not supported read formats

* update results

* 2 places for average speedup

* update results

* update results

* comment out tests

* update test

* update operator scores

* cleanup

* test sql df times

* Update more to use spark reported df duration

Signed-off-by: Thomas Graves <tgraves@apache.org>

* fix patch longest sql duration

* fix task duration

* try to calculate sql overhead

* only use stages used in sql

* remove some decimal checks

* cleanup utils

* calculate task time for stages in jobs in sql but not in execs

* look at texecs without stages

* change to account for sql ids without stage mapping

* dedup

* fix compil

* fix set

* update test results

* fix merge

* fix extra code

* update csv exepcted results

* move logging

* aggreaget Wholestagecodegen and children stages

* commonize code

* change type to not be seq of set

* update latest test results

* update dsv2 results

* remove logging

* typo

* Handle case sql duration > app duration

* remove unused variable

Co-authored-by: Ahmed Hussein (amahussein) <a@ahussein.me>
  • Loading branch information
tgravescs and amahussein authored May 24, 2022
1 parent 0f35d8c commit 808bcc2
Show file tree
Hide file tree
Showing 23 changed files with 169 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,20 @@ case class WholeStageExecParser(
val maxDuration = SQLPlanParser.getTotalDuration(accumId, app)
val stagesInNode = SQLPlanParser.getStagesInSQLNode(node, app)

val childNodeRes = node.nodes.flatMap { c =>
val childNodes = node.nodes.flatMap { c =>
SQLPlanParser.parsePlanNode(c, sqlID, checker, app)
}
// if any of the execs in WholeStageCodegen supported mark this entire thing
// as supported
val anySupported = childNodeRes.exists(_.isSupported == true)
val anySupported = childNodes.exists(_.isSupported == true)
// average speedup across the execs in the WholeStageCodegen for now
val supportedChildren = childNodeRes.filterNot(_.shouldRemove)
val supportedChildren = childNodes.filterNot(_.shouldRemove)
val avSpeedupFactor = SQLPlanParser.averageSpeedup(supportedChildren.map(_.speedupFactor))
// can't rely on the wholeStagecodeGen having a stage if children do so aggregate them together
// for now
val allStagesIncludingChildren = childNodes.flatMap(_.stages).toSet ++ stagesInNode.toSet
val execInfo = new ExecInfo(sqlID, node.name, node.name, avSpeedupFactor, maxDuration,
node.id, anySupported, Some(childNodeRes), stagesInNode)
node.id, anySupported, Some(childNodes), allStagesIncludingChildren.toSeq)
Seq(execInfo)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,7 @@ class PluginTypeChecker extends Logging {
// check if any of the not supported types are in the schema
val nsFiltered = dtSupMap(NS).filter(t => schemaLower.contains(t.toLowerCase()))
if (nsFiltered.nonEmpty) {
val deDuped = if (nsFiltered.contains("dec") && nsFiltered.contains("decimal")) {
nsFiltered.filterNot(_.equals("dec"))
} else {
nsFiltered
}
(0.0, deDuped.toSet)
(0.0, nsFiltered.toSet)
} else {
// Started out giving different weights based on partial support and so forth
// but decided to be optimistic and not penalize if we don't know, perhaps
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class QualOutputWriter(outputDir: String, reportReadSchema: Boolean, printStdout
val plans = sums.flatMap(_.planInfo)
val allExecs = QualOutputWriter.getAllExecsFromPlan(plans)
val headersAndSizes = QualOutputWriter
.getDetailedExecsHeaderStringsAndSizes(sums, allExecs)
.getDetailedExecsHeaderStringsAndSizes(sums, allExecs.toSeq)
csvFileWriter.write(QualOutputWriter.constructDetailedHeader(headersAndSizes, ",", false))
sums.foreach { sumInfo =>
val rows = QualOutputWriter.constructExecsInfo(sumInfo, headersAndSizes, ",", false)
Expand Down Expand Up @@ -165,6 +165,7 @@ object QualOutputWriter {
val ESTIMATED_GPU_DURATION = "Estimated GPU Duration"
val ESTIMATED_GPU_SPEEDUP = "Estimated GPU Speedup"
val ESTIMATED_GPU_TIMESAVED = "Estimated GPU Time Saved"
val STAGE_ESTIMATED_STR = "Stage Estimated"

val APP_DUR_STR_SIZE: Int = APP_DUR_STR.size
val SQL_DUR_STR_SIZE: Int = SQL_DUR_STR.size
Expand Down Expand Up @@ -360,7 +361,8 @@ object QualOutputWriter {
STAGE_ID_STR -> STAGE_ID_STR.size,
AVERAGE_SPEEDUP_STR -> AVERAGE_SPEEDUP_STR.size,
STAGE_DUR_STR -> STAGE_DUR_STR.size,
UNSUPPORTED_TASK_DURATION_STR -> UNSUPPORTED_TASK_DURATION_STR.size
UNSUPPORTED_TASK_DURATION_STR -> UNSUPPORTED_TASK_DURATION_STR.size,
STAGE_ESTIMATED_STR -> STAGE_ESTIMATED_STR.size
)
detailedHeadersAndFields
}
Expand All @@ -377,23 +379,24 @@ object QualOutputWriter {
info.stageId.toString -> headersAndSizes(STAGE_ID_STR),
f"${info.averageSpeedup}%1.2f" -> headersAndSizes(AVERAGE_SPEEDUP_STR),
info.stageTaskTime.toString -> headersAndSizes(STAGE_DUR_STR),
info.unsupportedTaskDur.toString -> headersAndSizes(UNSUPPORTED_TASK_DURATION_STR))
info.unsupportedTaskDur.toString -> headersAndSizes(UNSUPPORTED_TASK_DURATION_STR),
info.estimated.toString -> headersAndSizes(STAGE_ESTIMATED_STR))
constructOutputRow(data, delimiter, prettyPrint)
}
}

def getAllExecsFromPlan(plans: Seq[PlanInfo]): Seq[ExecInfo] = {
def getAllExecsFromPlan(plans: Seq[PlanInfo]): Set[ExecInfo] = {
val topExecInfo = plans.flatMap(_.execInfo)
topExecInfo.flatMap { e =>
e.children.getOrElse(Seq.empty) :+ e
}
}.toSet
}

def constructExecsInfo(
sumInfo: QualificationSummaryInfo,
headersAndSizes: LinkedHashMap[String, Int],
delimiter: String = "|",
prettyPrint: Boolean): Seq[String] = {
prettyPrint: Boolean): Set[String] = {
val allExecs = getAllExecsFromPlan(sumInfo.planInfo)
val appId = sumInfo.appId
allExecs.flatMap { info =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ abstract class AppBase(

// jobId to job info
val jobIdToInfo = new HashMap[Int, JobInfoClass]()
val jobIdToSqlID: HashMap[Int, Long] = HashMap.empty[Int, Long]

// SQL containing any Dataset operation or RDD to DataSet/DataFrame operation
val sqlIDToDataSetOrRDDCase: HashSet[Long] = HashSet[Long]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ abstract class EventProcessorBase[T <: AppBase](app: T) extends SparkListener wi
ProfileUtils.isPluginEnabled(event.properties.asScala) || app.gpuMode
)
app.jobIdToInfo.put(event.jobId, thisJob)
sqlID.foreach(app.jobIdToSqlID(event.jobId) = _)
}

override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
Expand Down
Loading

0 comments on commit 808bcc2

Please sign in to comment.