diff --git a/src/main/scala/com/databricks/labs/overwatch/pipeline/SilverTransforms.scala b/src/main/scala/com/databricks/labs/overwatch/pipeline/SilverTransforms.scala index 78e8ee54c..20cb1c588 100644 --- a/src/main/scala/com/databricks/labs/overwatch/pipeline/SilverTransforms.scala +++ b/src/main/scala/com/databricks/labs/overwatch/pipeline/SilverTransforms.scala @@ -1323,74 +1323,124 @@ trait SilverTransforms extends SparkSessionWrapper with DataFrameSyntax { .showLines(5, 20, true) .foreach( logger.log( Level.INFO, _)) - // Lookup to populate the clusterID/clusterName where missing from jobs - lazy val clusterSpecNameLookup = - clusterSpec.asDF.transformWithDescription( - NamedTransformation { (df: DataFrame) => { - df.select( 'organization_id, 'timestamp, 'cluster_name, 'cluster_id.alias("clusterId")) - .filter('clusterId.isNotNull && 'cluster_name.isNotNull)}}) - - // Lookup to populate the clusterID/clusterName where missing from jobs - lazy val clusterSnapNameLookup = - clusterSnapshot.asDF.transformWithDescription( - NamedTransformation { (df: DataFrame) => { - df.withColumn("timestamp", unix_timestamp('Pipeline_SnapTS) * lit(1000)) - .select('organization_id, 'timestamp, 'cluster_name, 'cluster_id.alias("clusterId")) - .filter('clusterId.isNotNull && 'cluster_name.isNotNull)}}) - - // Lookup to populate the existing_cluster_id where missing from jobs -- it can be derived from name - lazy val jobStatusMetaLookup = - jobsStatus.asDF.transformWithDescription( - NamedTransformation { (df: DataFrame) => { - df.verifyMinimumSchema( Schema.minimumJobStatusSilverMetaLookupSchema) - .select( - 'organization_id, - 'timestamp, - 'jobId, - 'jobName, - to_json('tags).alias("tags"), - 'schedule, - 'max_concurrent_runs, - 'run_as_user_name, - 'timeout_seconds, - 'created_by, - 'last_edited_by, - to_json('tasks).alias("tasks"), - to_json('job_clusters).alias("job_clusters"), - to_json($"task_detail_legacy.notebook_task").alias("notebook_task"), - to_json($"task_detail_legacy.spark_python_task").alias("spark_python_task"), - to_json($"task_detail_legacy.python_wheel_task").alias("python_wheel_task"), - to_json($"task_detail_legacy.spark_jar_task").alias("spark_jar_task"), - to_json($"task_detail_legacy.spark_submit_task").alias("spark_submit_task"), - to_json($"task_detail_legacy.shell_command_task").alias("shell_command_task"), - to_json($"task_detail_legacy.pipeline_task").alias("pipeline_task") - )}}) - - lazy val jobSnapNameLookup = jobsSnapshot.asDF - .withColumn("timestamp", unix_timestamp('Pipeline_SnapTS) * lit(1000)) - .select('organization_id, 'timestamp, 'job_id.alias("jobId"), $"settings.name".alias("jobName")) - - val jobRunsLookups = jobRunsInitializeLookups( - (clusterSpec, clusterSpecNameLookup), - (clusterSnapshot, clusterSnapNameLookup), - (jobsStatus, jobStatusMetaLookup), - (jobsSnapshot, jobSnapNameLookup) - ) + /** + * Look up the cluster_name based on id first from + * `job_status_silver`. If not present there fallback to latest + * snapshot prior to the run + */ + + val jobRunsAppendClusterName = NamedTransformation { + (df: DataFrame) => { + + val key = Seq( "organization_id", "clusterId") + + lazy val clusterSpecNameLookup = clusterSpec.asDF + .select( + 'organization_id, + 'timestamp, + 'cluster_name, + 'cluster_id.alias("clusterId")) + .filter( + 'clusterId.isNotNull + && 'cluster_name.isNotNull) + .toTSDF( "timestamp", key:_*) + + lazy val clusterSnapNameLookup = clusterSnapshot.asDF + .select( + // .withColumn("timestamp", unix_timestamp('Pipeline_SnapTS) * lit(1000)) + 'organization_id, + ( unix_timestamp('Pipeline_SnapTS) * lit(1000)).alias( "timestamp"), + 'cluster_name, + 'cluster_id.alias( "clusterId")) + .filter( + 'clusterId.isNotNull + && 'cluster_name.isNotNull) + .toTSDF( "timestamp", key:_*) + + df.toTSDF( "timestamp", key:_*) + .lookupWhen( clusterSpecNameLookup) + .lookupWhen( clusterSnapNameLookup) + .df + } + } + + /** + * looks up the job name based on id first from job_status_silver + * and if not present there fallback to latest snapshot prior to + * the run + */ + + val jobRunsAppendJobMeta = NamedTransformation { + (df: DataFrame) => { + + val key = Seq( "organization_id", "jobId") + + lazy val jobStatusMetaLookup = jobsStatus.asDF + .verifyMinimumSchema( + Schema.minimumJobStatusSilverMetaLookupSchema) + .select( + 'organization_id, + 'timestamp, + 'jobId, + 'jobName, + to_json('tags).alias("tags"), + 'schedule, + 'max_concurrent_runs, + 'run_as_user_name, + 'timeout_seconds, + 'created_by, + 'last_edited_by, + to_json( 'tasks).alias("tasks"), + to_json( 'job_clusters).alias("job_clusters"), + to_json( $"task_detail_legacy.notebook_task").alias( "notebook_task"), + to_json( $"task_detail_legacy.spark_python_task").alias( "spark_python_task"), + to_json( $"task_detail_legacy.python_wheel_task").alias( "python_wheel_task"), + to_json( $"task_detail_legacy.spark_jar_task").alias( "spark_jar_task"), + to_json( $"task_detail_legacy.spark_submit_task").alias( "spark_submit_task"), + to_json( $"task_detail_legacy.shell_command_task").alias( "shell_command_task"), + to_json( $"task_detail_legacy.pipeline_task").alias( "pipeline_task")) + .toTSDF( "timestamp", key:_*) + + lazy val jobSnapshotNameLookup = jobsSnapshot.asDF + // .withColumn("timestamp", unix_timestamp('Pipeline_SnapTS) * lit(1000)) + .select( + 'organization_id, + ( unix_timestamp( 'Pipeline_SnapTS) * lit( 1000)).alias( "timestamp"), + 'job_id.alias("jobId"), + $"settings.name".alias("jobName")) + .toTSDF( "timestamp", key:_*) + + df.toTSDF( "timestamp", key:_*) + .lookupWhen( jobStatusMetaLookup) + .lookupWhen( jobSnapshotNameLookup) + .df + .withColumns( Map( + "jobName" + -> coalesce('jobName, 'run_name), + "tasks" + -> coalesce('tasks, 'submitRun_tasks), + "job_clusters" + -> coalesce('job_clusters, 'submitRun_job_clusters))) + } + } + + + + // val jobRunsLookups = jobRunsInitializeLookups( + // (clusterSpec, clusterSpecNameLookup), + // (clusterSnapshot, clusterSnapNameLookup), + // (jobsStatus, jobStatusMetaLookup), + // (jobsSnapshot, jobSnapNameLookup) + // ) // caching before structifying jobRunsLag30D - .transformWithDescription( - jobRunsDeriveRunsBase( etlUntilTime)) - .transformWithDescription( - jobRunsAppendClusterName( jobRunsLookups)) - .transformWithDescription( - jobRunsAppendJobMeta( jobRunsLookups)) - .transformWithDescription( - jobRunsStructifyLookupMeta( optimalCacheParts)) - .transformWithDescription( - jobRunsAppendTaskAndClusterDetails) - .transformWithDescription( - jobRunsCleanseCreatedNestedStructures( targetKeys)) + .transformWithDescription( jobRunsDeriveRunsBase( etlUntilTime)) + .transformWithDescription( jobRunsAppendClusterName) + .transformWithDescription( jobRunsAppendJobMeta) + .transformWithDescription( jobRunsStructifyLookupMeta( optimalCacheParts)) + .transformWithDescription( jobRunsAppendTaskAndClusterDetails) + .transformWithDescription( jobRunsCleanseCreatedNestedStructures( targetKeys)) .drop("timestamp") // `timestamp` could be duplicated to enable `asOf` lookups; // dropping to clean up diff --git a/src/main/scala/com/databricks/labs/overwatch/pipeline/WorkflowsTransforms.scala b/src/main/scala/com/databricks/labs/overwatch/pipeline/WorkflowsTransforms.scala index 9472ed70b..b728514b8 100644 --- a/src/main/scala/com/databricks/labs/overwatch/pipeline/WorkflowsTransforms.scala +++ b/src/main/scala/com/databricks/labs/overwatch/pipeline/WorkflowsTransforms.scala @@ -544,13 +544,13 @@ object WorkflowsTransforms extends SparkSessionWrapper { */ // val jobRunsLookups: Map[String, DataFrame] = - def jobRunsInitializeLookups(lookups: (PipelineTable, DataFrame)*): Map[String, DataFrame] = { - lookups - .filter(_._1.exists) - .map(lookup => { - (lookup._1.name, lookup._2) - }).toMap - } + // def jobRunsInitializeLookups(lookups: (PipelineTable, DataFrame)*): Map[String, DataFrame] = { + // lookups + // .filter(_._1.exists) + // .map(lookup => { + // (lookup._1.name, lookup._2) + // }).toMap + // } def jobRunsDeriveCompletedRuns(df: DataFrame, firstRunSemanticsW: WindowSpec): DataFrame = { df @@ -992,71 +992,43 @@ object WorkflowsTransforms extends SparkSessionWrapper { }} - /** - * Look up the cluster_name based on id first from - * `job_status_silver`. If not present there fallback to latest - * snapshot prior to the run - */ - - val jobRunsAppendClusterName = (lookups: Map[String,DataFrame]) => NamedTransformation { - - (df: DataFrame) => { - - val runsWClusterNames1 = if (lookups.contains("cluster_spec_silver")) { - df.toTSDF("timestamp", "organization_id", "clusterId") - .lookupWhen( - lookups("cluster_spec_silver") - .toTSDF("timestamp", "organization_id", "clusterId") - ).df - } else df - - val runsWClusterNames2 = if (lookups.contains("clusters_snapshot_bronze")) { - runsWClusterNames1 - .toTSDF("timestamp", "organization_id", "clusterId") - .lookupWhen( - lookups("clusters_snapshot_bronze") - .toTSDF("timestamp", "organization_id", "clusterId") - ).df - } else runsWClusterNames1 - - runsWClusterNames2 - } - - } - - - /** - * looks up the job name based on id first from job_status_silver and if not present there fallback to latest - * snapshot prior to the run - */ - val jobRunsAppendJobMeta = (lookups: Map[String, DataFrame]) => NamedTransformation { - (df: DataFrame) => { - - val runsWithJobName1 = if (lookups.contains("job_status_silver")) { - df - .toTSDF("timestamp", "organization_id", "jobId") - .lookupWhen( - lookups("job_status_silver") - .toTSDF("timestamp", "organization_id", "jobId") - ).df - } else df - - val runsWithJobName2 = if (lookups.contains("jobs_snapshot_bronze")) { - runsWithJobName1 - .toTSDF("timestamp", "organization_id", "jobId") - .lookupWhen( - lookups("jobs_snapshot_bronze") - .toTSDF("timestamp", "organization_id", "jobId") - ).df - } else df - - runsWithJobName2 - .withColumn("jobName", coalesce('jobName, 'run_name)) - .withColumn("tasks", coalesce('tasks, 'submitRun_tasks)) - .withColumn("job_clusters", coalesce('job_clusters, 'submitRun_job_clusters)) - - } - } + // /** + // * Look up the cluster_name based on id first from + // * `job_status_silver`. If not present there fallback to latest + // * snapshot prior to the run + // */ + + // val jobRunsAppendClusterName = NamedTransformation { + // (df: DataFrame) => { + // val key = Seq( "timestamp", "organization_id", "clusterId") + // df.toTSDF( key:_*) + // .lookupWhen( clusterSpecNameLookup.toTSDF( key:_*)) + // .lookupWhen( clusterSnapNameLookup.toTSDF( key:_*)) + // .df + // } + // } + + + // /** + // * looks up the job name based on id first from job_status_silver and if not present there fallback to latest + // * snapshot prior to the run + // */ + // val jobRunsAppendJobMeta = NamedTransformation { + // (df: DataFrame) => { + // val key = Seq( "timestamp", "organization_id", "jobId") + // df.toTSDF( key:_*) + // .lookupWhen( jobStatusMetaLookup.toTSDF( key:_*)) + // .lookupWhen( jobSnapNameLookup.toTSDF( key:_*)) + // .df + // .withColumns( Map( + // "jobName" + // -> coalesce('jobName, 'run_name), + // "tasks" + // -> coalesce('tasks, 'submitRun_tasks), + // "job_clusters" + // -> coalesce('job_clusters, 'submitRun_job_clusters))) + // } + // } val jobRunsAppendTaskAndClusterDetails = NamedTransformation { (df: DataFrame) => { val computeIsSQLWarehouse = $"task_detail.sql_task.warehouse_id".isNotNull