Skip to content

Commit

Permalink
Refactor lookups in Silver Job Runs
Browse files Browse the repository at this point in the history
Removed a level of indirection and unnecessary conditional branching in definition of chained `lookupWhen` transformations.

Moved defintions to have references to `PipelineTable` objects in scope rather than passing them by argument.
  • Loading branch information
neilbest-db committed Jun 13, 2024
1 parent 5fb52bc commit efdd63f
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1323,74 +1323,124 @@ trait SilverTransforms extends SparkSessionWrapper with DataFrameSyntax {
.showLines(5, 20, true)
.foreach( logger.log( Level.INFO, _))

// Lookup to populate the clusterID/clusterName where missing from jobs
lazy val clusterSpecNameLookup =
clusterSpec.asDF.transformWithDescription(
NamedTransformation { (df: DataFrame) => {
df.select( 'organization_id, 'timestamp, 'cluster_name, 'cluster_id.alias("clusterId"))
.filter('clusterId.isNotNull && 'cluster_name.isNotNull)}})

// Lookup to populate the clusterID/clusterName where missing from jobs
lazy val clusterSnapNameLookup =
clusterSnapshot.asDF.transformWithDescription(
NamedTransformation { (df: DataFrame) => {
df.withColumn("timestamp", unix_timestamp('Pipeline_SnapTS) * lit(1000))
.select('organization_id, 'timestamp, 'cluster_name, 'cluster_id.alias("clusterId"))
.filter('clusterId.isNotNull && 'cluster_name.isNotNull)}})

// Lookup to populate the existing_cluster_id where missing from jobs -- it can be derived from name
lazy val jobStatusMetaLookup =
jobsStatus.asDF.transformWithDescription(
NamedTransformation { (df: DataFrame) => {
df.verifyMinimumSchema( Schema.minimumJobStatusSilverMetaLookupSchema)
.select(
'organization_id,
'timestamp,
'jobId,
'jobName,
to_json('tags).alias("tags"),
'schedule,
'max_concurrent_runs,
'run_as_user_name,
'timeout_seconds,
'created_by,
'last_edited_by,
to_json('tasks).alias("tasks"),
to_json('job_clusters).alias("job_clusters"),
to_json($"task_detail_legacy.notebook_task").alias("notebook_task"),
to_json($"task_detail_legacy.spark_python_task").alias("spark_python_task"),
to_json($"task_detail_legacy.python_wheel_task").alias("python_wheel_task"),
to_json($"task_detail_legacy.spark_jar_task").alias("spark_jar_task"),
to_json($"task_detail_legacy.spark_submit_task").alias("spark_submit_task"),
to_json($"task_detail_legacy.shell_command_task").alias("shell_command_task"),
to_json($"task_detail_legacy.pipeline_task").alias("pipeline_task")
)}})

lazy val jobSnapNameLookup = jobsSnapshot.asDF
.withColumn("timestamp", unix_timestamp('Pipeline_SnapTS) * lit(1000))
.select('organization_id, 'timestamp, 'job_id.alias("jobId"), $"settings.name".alias("jobName"))

val jobRunsLookups = jobRunsInitializeLookups(
(clusterSpec, clusterSpecNameLookup),
(clusterSnapshot, clusterSnapNameLookup),
(jobsStatus, jobStatusMetaLookup),
(jobsSnapshot, jobSnapNameLookup)
)
/**
* Look up the cluster_name based on id first from
* `job_status_silver`. If not present there fallback to latest
* snapshot prior to the run
*/

val jobRunsAppendClusterName = NamedTransformation {
(df: DataFrame) => {

val key = Seq( "organization_id", "clusterId")

lazy val clusterSpecNameLookup = clusterSpec.asDF
.select(
'organization_id,
'timestamp,
'cluster_name,
'cluster_id.alias("clusterId"))
.filter(
'clusterId.isNotNull
&& 'cluster_name.isNotNull)
.toTSDF( "timestamp", key:_*)

lazy val clusterSnapNameLookup = clusterSnapshot.asDF
.select(
// .withColumn("timestamp", unix_timestamp('Pipeline_SnapTS) * lit(1000))
'organization_id,
( unix_timestamp('Pipeline_SnapTS) * lit(1000)).alias( "timestamp"),
'cluster_name,
'cluster_id.alias( "clusterId"))
.filter(
'clusterId.isNotNull
&& 'cluster_name.isNotNull)
.toTSDF( "timestamp", key:_*)

df.toTSDF( "timestamp", key:_*)
.lookupWhen( clusterSpecNameLookup)
.lookupWhen( clusterSnapNameLookup)
.df
}
}

/**
* looks up the job name based on id first from job_status_silver
* and if not present there fallback to latest snapshot prior to
* the run
*/

val jobRunsAppendJobMeta = NamedTransformation {
(df: DataFrame) => {

val key = Seq( "organization_id", "jobId")

lazy val jobStatusMetaLookup = jobsStatus.asDF
.verifyMinimumSchema(
Schema.minimumJobStatusSilverMetaLookupSchema)
.select(
'organization_id,
'timestamp,
'jobId,
'jobName,
to_json('tags).alias("tags"),
'schedule,
'max_concurrent_runs,
'run_as_user_name,
'timeout_seconds,
'created_by,
'last_edited_by,
to_json( 'tasks).alias("tasks"),
to_json( 'job_clusters).alias("job_clusters"),
to_json( $"task_detail_legacy.notebook_task").alias( "notebook_task"),
to_json( $"task_detail_legacy.spark_python_task").alias( "spark_python_task"),
to_json( $"task_detail_legacy.python_wheel_task").alias( "python_wheel_task"),
to_json( $"task_detail_legacy.spark_jar_task").alias( "spark_jar_task"),
to_json( $"task_detail_legacy.spark_submit_task").alias( "spark_submit_task"),
to_json( $"task_detail_legacy.shell_command_task").alias( "shell_command_task"),
to_json( $"task_detail_legacy.pipeline_task").alias( "pipeline_task"))
.toTSDF( "timestamp", key:_*)

lazy val jobSnapshotNameLookup = jobsSnapshot.asDF
// .withColumn("timestamp", unix_timestamp('Pipeline_SnapTS) * lit(1000))
.select(
'organization_id,
( unix_timestamp( 'Pipeline_SnapTS) * lit( 1000)).alias( "timestamp"),
'job_id.alias("jobId"),
$"settings.name".alias("jobName"))
.toTSDF( "timestamp", key:_*)

df.toTSDF( "timestamp", key:_*)
.lookupWhen( jobStatusMetaLookup)
.lookupWhen( jobSnapshotNameLookup)
.df
.withColumns( Map(
"jobName"
-> coalesce('jobName, 'run_name),
"tasks"
-> coalesce('tasks, 'submitRun_tasks),
"job_clusters"
-> coalesce('job_clusters, 'submitRun_job_clusters)))
}
}



// val jobRunsLookups = jobRunsInitializeLookups(
// (clusterSpec, clusterSpecNameLookup),
// (clusterSnapshot, clusterSnapNameLookup),
// (jobsStatus, jobStatusMetaLookup),
// (jobsSnapshot, jobSnapNameLookup)
// )

// caching before structifying
jobRunsLag30D
.transformWithDescription(
jobRunsDeriveRunsBase( etlUntilTime))
.transformWithDescription(
jobRunsAppendClusterName( jobRunsLookups))
.transformWithDescription(
jobRunsAppendJobMeta( jobRunsLookups))
.transformWithDescription(
jobRunsStructifyLookupMeta( optimalCacheParts))
.transformWithDescription(
jobRunsAppendTaskAndClusterDetails)
.transformWithDescription(
jobRunsCleanseCreatedNestedStructures( targetKeys))
.transformWithDescription( jobRunsDeriveRunsBase( etlUntilTime))
.transformWithDescription( jobRunsAppendClusterName)
.transformWithDescription( jobRunsAppendJobMeta)
.transformWithDescription( jobRunsStructifyLookupMeta( optimalCacheParts))
.transformWithDescription( jobRunsAppendTaskAndClusterDetails)
.transformWithDescription( jobRunsCleanseCreatedNestedStructures( targetKeys))
.drop("timestamp")
// `timestamp` could be duplicated to enable `asOf` lookups;
// dropping to clean up
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,13 +544,13 @@ object WorkflowsTransforms extends SparkSessionWrapper {
*/

// val jobRunsLookups: Map[String, DataFrame] =
def jobRunsInitializeLookups(lookups: (PipelineTable, DataFrame)*): Map[String, DataFrame] = {
lookups
.filter(_._1.exists)
.map(lookup => {
(lookup._1.name, lookup._2)
}).toMap
}
// def jobRunsInitializeLookups(lookups: (PipelineTable, DataFrame)*): Map[String, DataFrame] = {
// lookups
// .filter(_._1.exists)
// .map(lookup => {
// (lookup._1.name, lookup._2)
// }).toMap
// }

def jobRunsDeriveCompletedRuns(df: DataFrame, firstRunSemanticsW: WindowSpec): DataFrame = {
df
Expand Down Expand Up @@ -992,71 +992,43 @@ object WorkflowsTransforms extends SparkSessionWrapper {

}}

/**
* Look up the cluster_name based on id first from
* `job_status_silver`. If not present there fallback to latest
* snapshot prior to the run
*/

val jobRunsAppendClusterName = (lookups: Map[String,DataFrame]) => NamedTransformation {

(df: DataFrame) => {

val runsWClusterNames1 = if (lookups.contains("cluster_spec_silver")) {
df.toTSDF("timestamp", "organization_id", "clusterId")
.lookupWhen(
lookups("cluster_spec_silver")
.toTSDF("timestamp", "organization_id", "clusterId")
).df
} else df

val runsWClusterNames2 = if (lookups.contains("clusters_snapshot_bronze")) {
runsWClusterNames1
.toTSDF("timestamp", "organization_id", "clusterId")
.lookupWhen(
lookups("clusters_snapshot_bronze")
.toTSDF("timestamp", "organization_id", "clusterId")
).df
} else runsWClusterNames1

runsWClusterNames2
}

}


/**
* looks up the job name based on id first from job_status_silver and if not present there fallback to latest
* snapshot prior to the run
*/
val jobRunsAppendJobMeta = (lookups: Map[String, DataFrame]) => NamedTransformation {
(df: DataFrame) => {

val runsWithJobName1 = if (lookups.contains("job_status_silver")) {
df
.toTSDF("timestamp", "organization_id", "jobId")
.lookupWhen(
lookups("job_status_silver")
.toTSDF("timestamp", "organization_id", "jobId")
).df
} else df

val runsWithJobName2 = if (lookups.contains("jobs_snapshot_bronze")) {
runsWithJobName1
.toTSDF("timestamp", "organization_id", "jobId")
.lookupWhen(
lookups("jobs_snapshot_bronze")
.toTSDF("timestamp", "organization_id", "jobId")
).df
} else df

runsWithJobName2
.withColumn("jobName", coalesce('jobName, 'run_name))
.withColumn("tasks", coalesce('tasks, 'submitRun_tasks))
.withColumn("job_clusters", coalesce('job_clusters, 'submitRun_job_clusters))

}
}
// /**
// * Look up the cluster_name based on id first from
// * `job_status_silver`. If not present there fallback to latest
// * snapshot prior to the run
// */

// val jobRunsAppendClusterName = NamedTransformation {
// (df: DataFrame) => {
// val key = Seq( "timestamp", "organization_id", "clusterId")
// df.toTSDF( key:_*)
// .lookupWhen( clusterSpecNameLookup.toTSDF( key:_*))
// .lookupWhen( clusterSnapNameLookup.toTSDF( key:_*))
// .df
// }
// }


// /**
// * looks up the job name based on id first from job_status_silver and if not present there fallback to latest
// * snapshot prior to the run
// */
// val jobRunsAppendJobMeta = NamedTransformation {
// (df: DataFrame) => {
// val key = Seq( "timestamp", "organization_id", "jobId")
// df.toTSDF( key:_*)
// .lookupWhen( jobStatusMetaLookup.toTSDF( key:_*))
// .lookupWhen( jobSnapNameLookup.toTSDF( key:_*))
// .df
// .withColumns( Map(
// "jobName"
// -> coalesce('jobName, 'run_name),
// "tasks"
// -> coalesce('tasks, 'submitRun_tasks),
// "job_clusters"
// -> coalesce('job_clusters, 'submitRun_job_clusters)))
// }
// }

val jobRunsAppendTaskAndClusterDetails = NamedTransformation { (df: DataFrame) => {
val computeIsSQLWarehouse = $"task_detail.sql_task.warehouse_id".isNotNull
Expand Down

0 comments on commit efdd63f

Please sign in to comment.