Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add descriptive NamedTransformations to Spark UI #1223

Merged
merged 5 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name := "overwatch"

organization := "com.databricks.labs"

version := "0.8.1.1"
version := "0.8.2.0"

scalaVersion := "2.12.12"
scalacOptions ++= Seq("-Xmax-classfile-name", "78")
Expand All @@ -18,6 +18,7 @@ libraryDependencies += "com.databricks" % "dbutils-api_2.12" % "0.0.5" % Provide
libraryDependencies += "com.amazonaws" % "aws-java-sdk-s3" % "1.11.595" % Provided
libraryDependencies += "io.delta" % "delta-core_2.12" % "1.0.0" % Provided
libraryDependencies += "org.scalaj" %% "scalaj-http" % "2.4.2"
libraryDependencies += "com.lihaoyi" %% "sourcecode" % "0.4.1"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@neilbest-db This is not a licensed module. So we can't use it as per databrickslab standard.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@souravbaner-da, I ust noticed your comment from 2024-06-05. I discussed this with @sriram251-code here in Slack back on 2024-06-04. I believe we have discussed it on >= 1 team calls since then also. If it's still a concern I would like to learn more about the standard you mention. Is that in a document somewhere?


//libraryDependencies += "org.apache.hive" % "hive-metastore" % "2.3.9"

Expand Down Expand Up @@ -51,4 +52,4 @@ assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
assembly / assemblyOption := (assembly / assemblyOption).value.copy(includeScala = false)
assembly / assemblyOption := (assembly / assemblyOption).value.copy(includeScala = false)
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ class ETLDefinition(

val transformedDF = transforms.foldLeft(verifiedSourceDF) {
case (df, transform) =>
df.transform(transform)
df.sparkSession.sparkContext.setJobGroup(
s"${module.pipeline.config.workspaceName}:${module.moduleName}",
transform.toString)
df.transform( transform)
}
write(transformedDF, module)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.apache.spark.sql.{Column, DataFrame}

trait SilverTransforms extends SparkSessionWrapper {

import TransformationDescriber._
import spark.implicits._

private val logger: Logger = Logger.getLogger(this.getClass)
Expand Down Expand Up @@ -1363,7 +1364,8 @@ trait SilverTransforms extends SparkSessionWrapper {

// caching before structifying
jobRunsDeriveRunsBase(jobRunsLag30D, etlUntilTime)
.transform(jobRunsAppendClusterName(jobRunsLookups))
.transformWithDescription(
jobRunsAppendClusterName( jobRunsLookups))
.transform(jobRunsAppendJobMeta(jobRunsLookups))
.transform(jobRunsStructifyLookupMeta(optimalCacheParts))
.transform(jobRunsAppendTaskAndClusterDetails)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.apache.spark.sql.{Column, DataFrame}

object WorkflowsTransforms extends SparkSessionWrapper {

import TransformationDescriber._
import spark.implicits._

/**
Expand Down Expand Up @@ -990,31 +991,38 @@ object WorkflowsTransforms extends SparkSessionWrapper {
}

/**
* looks up the cluster_name based on id first from job_status_silver and if not present there fallback to latest
* snapshot prior to the run
*/
def jobRunsAppendClusterName(lookups: Map[String, DataFrame])(df: DataFrame): DataFrame = {

val runsWClusterNames1 = if (lookups.contains("cluster_spec_silver")) {
df.toTSDF("timestamp", "organization_id", "clusterId")
.lookupWhen(
lookups("cluster_spec_silver")
.toTSDF("timestamp", "organization_id", "clusterId")
).df
} else df

val runsWClusterNames2 = if (lookups.contains("clusters_snapshot_bronze")) {
runsWClusterNames1
.toTSDF("timestamp", "organization_id", "clusterId")
.lookupWhen(
lookups("clusters_snapshot_bronze")
.toTSDF("timestamp", "organization_id", "clusterId")
).df
} else runsWClusterNames1
* Look up the cluster_name based on id first from
* `job_status_silver`. If not present there fallback to latest
* snapshot prior to the run
*/

val jobRunsAppendClusterName = (lookups: Map[String,DataFrame]) => NamedTransformation {

(df: DataFrame) => {

val runsWClusterNames1 = if (lookups.contains("cluster_spec_silver")) {
df.toTSDF("timestamp", "organization_id", "clusterId")
.lookupWhen(
lookups("cluster_spec_silver")
.toTSDF("timestamp", "organization_id", "clusterId")
).df
} else df

val runsWClusterNames2 = if (lookups.contains("clusters_snapshot_bronze")) {
runsWClusterNames1
.toTSDF("timestamp", "organization_id", "clusterId")
.lookupWhen(
lookups("clusters_snapshot_bronze")
.toTSDF("timestamp", "organization_id", "clusterId")
).df
} else runsWClusterNames1

runsWClusterNames2
}

runsWClusterNames2
}


/**
* looks up the job name based on id first from job_status_silver and if not present there fallback to latest
* snapshot prior to the run
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.databricks.labs.overwatch.utils

import org.apache.spark.sql.Dataset

// TODO: implement this as a `trait`. Initial attempts would not
// compile because of the dependencies among other `trait`s and
// `object`s that would have to be refactored.

object TransformationDescriber {


class NamedTransformation[T,U](
val transformation: Dataset[T] => Dataset[U])(
implicit _name: sourcecode.Name) {

final val name: String = _name.value

override def toString = s"NamedTransformation ${_name.value}"

}


object NamedTransformation {

def apply[T,U](
transformation: Dataset[T] => Dataset[U])(
implicit name: sourcecode.Name) =
new NamedTransformation( transformation)( name)

}


implicit class TransformationDescriber[T,U]( ds: Dataset[T]) {

def transformWithDescription[U](
namedTransformation: NamedTransformation[T,U])(
implicit
// enclosing: sourcecode.Enclosing,
name: sourcecode.Name,
fileName: sourcecode.FileName,
line: sourcecode.Line
): Dataset[U] = {

// println( s"Inside TransformationDescriber.transformWithDescription: $enclosing")

val callSite = s"${name.value} at ${fileName.value}:${line.value}"

val sc = ds.sparkSession.sparkContext
sc.setJobDescription( namedTransformation.toString)
sc.setCallSite( callSite)

ds.transform( namedTransformation.transformation)

}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package com.databricks.labs.overwatch.utils

import com.databricks.labs.overwatch.SparkSessionTestWrapper
import org.apache.spark.sql.DataFrame
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.GivenWhenThen
import java.io.ByteArrayOutputStream

class TransformationDescriberTest
extends AnyFunSpec
with GivenWhenThen
with SparkSessionTestWrapper {

import TransformationDescriber._
import spark.implicits._
spark.conf.set("spark.sql.session.timeZone", "UTC")

val t = (df: DataFrame) => df.select( $"foo")

val nt = NamedTransformation( t)

// TODO: replace use of `s` and `Console.withOut` with an abstraction

val s = new ByteArrayOutputStream

describe( "A NamedTransformation") {

it( "wraps a function literal") {

info( s"nt.transformation: ${nt.transformation}")

assert( nt.transformation === t)

}

it( "knows its own name") {

info( s"`nt.name`: ${nt.name}")
info( s"`nt.toString`: ${nt.toString}")

assert( nt.name === "nt")
assert( nt.toString === "NamedTransformation nt")

}

Given( "a Spark `Dataset` (including `DataFrame`s)")

val in = Seq( ("foo", "bar")).toDF( "foo", "bar")

Console.withOut( s) {
in.show(numRows= 1, truncate= 0, vertical= true)
}
// info( s.toString)
s.toString.linesIterator.foreach( info(_))
s.reset

When( "a `NamedTransformation` is applied")

val out = in.transformWithDescription( nt)

// val s = new ByteArrayOutputStream
Console.withOut( s) {
out.show(numRows= 1, truncate= 0, vertical= true)
}
// info( s.toString)
s.toString.linesIterator.foreach( info(_))



Then( "the resulting Spark jobs have a matching description (pending)")

// info( s"""spark.jobGroup.id: ${out.sparkSession.sparkContext.getLocalProperty( "spark.jobGroup.id")}""")

val sjd = out.sparkSession.sparkContext.getLocalProperty( "spark.job.description")

info( s"spark.job.description: ${sjd}")

assert( sjd === "NamedTransformation nt")

// info( s"""spark.callSite.short: ${out.sparkSession.sparkContext.getLocalProperty( "spark.callSite.short")}""")
// info( s"""spark.callSite.long: ${out.sparkSession.sparkContext.getLocalProperty( "spark.callSite.long")}""")






And( "the result of the transformation is correct")

assertResult( "`foo` STRING") {
out.schema.toDDL
}

assertResult( "foo") {
out.first.getString(0)
}


}


}