Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Analyze and improve Silver Job Runs performance (Spark 3.3.0) #1230

Draft
wants to merge 12 commits into
base: 0900_release
Choose a base branch
from
1 change: 1 addition & 0 deletions .tool-versions
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
java zulu-8.56.0.23
7 changes: 4 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@ organization := "com.databricks.labs"

version := "0.8.2.0"

scalaVersion := "2.12.12"
scalaVersion := "2.12.14"
scalacOptions ++= Seq("-Xmax-classfile-name", "78")

Test / fork := true
Test / envVars := Map("OVERWATCH_ENV" -> " ","OVERWATCH_TOKEN" -> " ","OVERWATCH" -> " ")

val sparkVersion = "3.1.2"
val sparkVersion = "3.3.0"
libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion % Provided
libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion % Provided
libraryDependencies += "org.apache.spark" %% "spark-hive" % sparkVersion % Provided
libraryDependencies += "com.databricks" % "dbutils-api_2.12" % "0.0.5" % Provided
libraryDependencies += "com.amazonaws" % "aws-java-sdk-s3" % "1.11.595" % Provided
libraryDependencies += "io.delta" % "delta-core_2.12" % "1.0.0" % Provided
libraryDependencies += "io.delta" %% "delta-core" % "2.1.0" % Provided
libraryDependencies += "org.eclipse.jetty" % "jetty-util" % "9.4.51.v20230217" % Provided
libraryDependencies += "org.scalaj" %% "scalaj-http" % "2.4.2"
libraryDependencies += "com.lihaoyi" %% "sourcecode" % "0.4.1"

Expand Down
6 changes: 6 additions & 0 deletions project/metals.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// DO NOT EDIT! This file is auto-generated.

// This file enables sbt-bloop to create bloop config files.

addSbtPlugin("ch.epfl.scala" % "sbt-bloop" % "1.5.15")

Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@ class ETLDefinition(

val transformedDF = transforms.foldLeft(verifiedSourceDF) {
case (df, transform) =>
/*
* reverting Spark UI Job Group labels for now
*
* TODO: enumerate the regressions this would introduce
* when the labels set by then platform are replaced
* this way.
* df.sparkSession.sparkContext.setJobGroup(
* s"${module.pipeline.config.workspaceName}:${module.moduleName}",
* transform.toString)
*/
/*
* reverting Spark UI Job Group labels for now
*
* TODO: enumerate the regressions this would introduce
* when the labels set by then platform are replaced
* this way.
* df.sparkSession.sparkContext.setJobGroup(
* s"${module.pipeline.config.workspaceName}:${module.moduleName}",
* transform.toString)
*/

df.transform( transform)
df.transform( transform)
}
write(transformedDF, module)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col, lit}

import java.time.Duration
import scala.util.parsing.json.JSON.number

class Module(
val moduleId: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Column, DataFrame}


trait SilverTransforms extends SparkSessionWrapper {
trait SilverTransforms extends SparkSessionWrapper with DataFrameSyntax {

import TransformationDescriber._
import spark.implicits._
Expand Down Expand Up @@ -1406,53 +1406,124 @@ trait SilverTransforms extends SparkSessionWrapper {
// eagerly force this highly reused DF into cache()
jobRunsLag30D.count()

// Lookup to populate the clusterID/clusterName where missing from jobs
lazy val clusterSpecNameLookup = clusterSpec.asDF
.select('organization_id, 'timestamp, 'cluster_name, 'cluster_id.alias("clusterId"))
.filter('clusterId.isNotNull && 'cluster_name.isNotNull)

// TODO: remove or comment out or change log level or . . .

// Lookup to populate the clusterID/clusterName where missing from jobs
lazy val clusterSnapNameLookup = clusterSnapshot.asDF
.withColumn("timestamp", unix_timestamp('Pipeline_SnapTS) * lit(1000))
.select('organization_id, 'timestamp, 'cluster_name, 'cluster_id.alias("clusterId"))
.filter('clusterId.isNotNull && 'cluster_name.isNotNull)
logger.log( Level.INFO, "Showing first 5 rows of `jobRunsLag30D`:")

// Lookup to populate the existing_cluster_id where missing from jobs -- it can be derived from name
lazy val jobStatusMetaLookup = jobsStatus.asDF
.verifyMinimumSchema(Schema.minimumJobStatusSilverMetaLookupSchema)
.select(
'organization_id,
'timestamp,
'jobId,
'jobName,
to_json('tags).alias("tags"),
'schedule,
'max_concurrent_runs,
'run_as_user_name,
'timeout_seconds,
'created_by,
'last_edited_by,
to_json('tasks).alias("tasks"),
to_json('job_clusters).alias("job_clusters"),
to_json($"task_detail_legacy.notebook_task").alias("notebook_task"),
to_json($"task_detail_legacy.spark_python_task").alias("spark_python_task"),
to_json($"task_detail_legacy.python_wheel_task").alias("python_wheel_task"),
to_json($"task_detail_legacy.spark_jar_task").alias("spark_jar_task"),
to_json($"task_detail_legacy.spark_submit_task").alias("spark_submit_task"),
to_json($"task_detail_legacy.shell_command_task").alias("shell_command_task"),
to_json($"task_detail_legacy.pipeline_task").alias("pipeline_task")
)
jobRunsLag30D
.showLines(5, 20, true)
.foreach( logger.log( Level.INFO, _))

/**
* Look up the cluster_name based on id first from
* `job_status_silver`. If not present there fallback to latest
* snapshot prior to the run
*/

val jobRunsAppendClusterName = NamedTransformation {
(df: DataFrame) => {

val key = Seq( "organization_id", "clusterId")

lazy val clusterSpecNameLookup = clusterSpec.asDF
.select(
'organization_id,
'timestamp,
'cluster_name,
'cluster_id.alias("clusterId"))
.filter(
'clusterId.isNotNull
&& 'cluster_name.isNotNull)
.toTSDF( "timestamp", key:_*)

lazy val clusterSnapNameLookup = clusterSnapshot.asDF
.select(
// .withColumn("timestamp", unix_timestamp('Pipeline_SnapTS) * lit(1000))
'organization_id,
( unix_timestamp('Pipeline_SnapTS) * lit(1000)).alias( "timestamp"),
'cluster_name,
'cluster_id.alias( "clusterId"))
.filter(
'clusterId.isNotNull
&& 'cluster_name.isNotNull)
.toTSDF( "timestamp", key:_*)

df.toTSDF( "timestamp", key:_*)
.lookupWhen( clusterSpecNameLookup)
.lookupWhen( clusterSnapNameLookup)
.df
}
}

lazy val jobSnapNameLookup = jobsSnapshot.asDF
.withColumn("timestamp", unix_timestamp('Pipeline_SnapTS) * lit(1000))
.select('organization_id, 'timestamp, 'job_id.alias("jobId"), $"settings.name".alias("jobName"))
/**
* looks up the job name based on id first from job_status_silver
* and if not present there fallback to latest snapshot prior to
* the run
*/

val jobRunsAppendJobMeta = NamedTransformation {
(df: DataFrame) => {

val key = Seq( "organization_id", "jobId")

lazy val jobStatusMetaLookup = jobsStatus.asDF
.verifyMinimumSchema(
Schema.minimumJobStatusSilverMetaLookupSchema)
.select(
'organization_id,
'timestamp,
'jobId,
'jobName,
to_json('tags).alias("tags"),
'schedule,
'max_concurrent_runs,
'run_as_user_name,
'timeout_seconds,
'created_by,
'last_edited_by,
to_json( 'tasks).alias("tasks"),
to_json( 'job_clusters).alias("job_clusters"),
to_json( $"task_detail_legacy.notebook_task").alias( "notebook_task"),
to_json( $"task_detail_legacy.spark_python_task").alias( "spark_python_task"),
to_json( $"task_detail_legacy.python_wheel_task").alias( "python_wheel_task"),
to_json( $"task_detail_legacy.spark_jar_task").alias( "spark_jar_task"),
to_json( $"task_detail_legacy.spark_submit_task").alias( "spark_submit_task"),
to_json( $"task_detail_legacy.shell_command_task").alias( "shell_command_task"),
to_json( $"task_detail_legacy.pipeline_task").alias( "pipeline_task"))
.toTSDF( "timestamp", key:_*)

lazy val jobSnapshotNameLookup = jobsSnapshot.asDF
// .withColumn("timestamp", unix_timestamp('Pipeline_SnapTS) * lit(1000))
.select(
'organization_id,
( unix_timestamp( 'Pipeline_SnapTS) * lit( 1000)).alias( "timestamp"),
'job_id.alias("jobId"),
$"settings.name".alias("jobName"))
.toTSDF( "timestamp", key:_*)

df.toTSDF( "timestamp", key:_*)
.lookupWhen( jobStatusMetaLookup)
.lookupWhen( jobSnapshotNameLookup)
.df
.withColumns( Map(
"jobName"
-> coalesce('jobName, 'run_name),
"tasks"
-> coalesce('tasks, 'submitRun_tasks),
"job_clusters"
-> coalesce('job_clusters, 'submitRun_job_clusters)))
}
}

val jobRunsLookups = jobRunsInitializeLookups(
(clusterSpec, clusterSpecNameLookup),
(clusterSnapshot, clusterSnapNameLookup),
(jobsStatus, jobStatusMetaLookup),
(jobsSnapshot, jobSnapNameLookup)
)


// val jobRunsLookups = jobRunsInitializeLookups(
// (clusterSpec, clusterSpecNameLookup),
// (clusterSnapshot, clusterSnapNameLookup),
// (jobsStatus, jobStatusMetaLookup),
// (jobsSnapshot, jobSnapNameLookup)
// )

// caching before structifying
jobRunsDeriveRunsBase(jobRunsLag30D, etlUntilTime)
Expand Down
Loading