-
Notifications
You must be signed in to change notification settings - Fork 64
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add descriptive NamedTransformation
s to Spark UI
#1223
Merged
+208
−26
Merged
Changes from 3 commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
da0c55a
Initial commit
gueniai 1f145aa
Add descriptive job group IDs and named transformations
neilbest-db a6a13fe
improve TransformationDescriberTest
neilbest-db 7346785
flip transformation names to beginning of label
neilbest-db 9a7fddb
revert modified Spark UI Job Group labels
neilbest-db File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
58 changes: 58 additions & 0 deletions
58
src/main/scala/com/databricks/labs/overwatch/utils/TransformationDescriber.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
package com.databricks.labs.overwatch.utils | ||
|
||
import org.apache.spark.sql.Dataset | ||
|
||
// TODO: implement this as a `trait`. Initial attempts would not | ||
// compile because of the dependencies among other `trait`s and | ||
// `object`s that would have to be refactored. | ||
|
||
object TransformationDescriber { | ||
|
||
|
||
class NamedTransformation[T,U]( | ||
val transformation: Dataset[T] => Dataset[U])( | ||
implicit _name: sourcecode.Name) { | ||
|
||
final val name: String = _name.value | ||
|
||
override def toString = s"NamedTransformation ${_name.value}" | ||
|
||
} | ||
|
||
|
||
object NamedTransformation { | ||
|
||
def apply[T,U]( | ||
transformation: Dataset[T] => Dataset[U])( | ||
implicit name: sourcecode.Name) = | ||
new NamedTransformation( transformation)( name) | ||
|
||
} | ||
|
||
|
||
implicit class TransformationDescriber[T,U]( ds: Dataset[T]) { | ||
|
||
def transformWithDescription[U]( | ||
namedTransformation: NamedTransformation[T,U])( | ||
implicit | ||
// enclosing: sourcecode.Enclosing, | ||
name: sourcecode.Name, | ||
fileName: sourcecode.FileName, | ||
line: sourcecode.Line | ||
): Dataset[U] = { | ||
|
||
// println( s"Inside TransformationDescriber.transformWithDescription: $enclosing") | ||
|
||
val callSite = s"${name.value} at ${fileName.value}:${line.value}" | ||
|
||
val sc = ds.sparkSession.sparkContext | ||
sc.setJobDescription( namedTransformation.toString) | ||
sc.setCallSite( callSite) | ||
|
||
ds.transform( namedTransformation.transformation) | ||
|
||
} | ||
|
||
} | ||
|
||
} |
102 changes: 102 additions & 0 deletions
102
src/test/scala/com/databricks/labs/overwatch/utils/TransformationDescriberTest.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
package com.databricks.labs.overwatch.utils | ||
|
||
import com.databricks.labs.overwatch.SparkSessionTestWrapper | ||
import org.apache.spark.sql.DataFrame | ||
import org.scalatest.funspec.AnyFunSpec | ||
import org.scalatest.GivenWhenThen | ||
import java.io.ByteArrayOutputStream | ||
|
||
class TransformationDescriberTest | ||
extends AnyFunSpec | ||
with GivenWhenThen | ||
with SparkSessionTestWrapper { | ||
|
||
import TransformationDescriber._ | ||
import spark.implicits._ | ||
spark.conf.set("spark.sql.session.timeZone", "UTC") | ||
|
||
val t = (df: DataFrame) => df.select( $"foo") | ||
|
||
val nt = NamedTransformation( t) | ||
|
||
// TODO: replace use of `s` and `Console.withOut` with an abstraction | ||
|
||
val s = new ByteArrayOutputStream | ||
|
||
describe( "A NamedTransformation") { | ||
|
||
it( "wraps a function literal") { | ||
|
||
info( s"nt.transformation: ${nt.transformation}") | ||
|
||
assert( nt.transformation === t) | ||
|
||
} | ||
|
||
it( "knows its own name") { | ||
|
||
info( s"`nt.name`: ${nt.name}") | ||
info( s"`nt.toString`: ${nt.toString}") | ||
|
||
assert( nt.name === "nt") | ||
assert( nt.toString === "NamedTransformation nt") | ||
|
||
} | ||
|
||
Given( "a Spark `Dataset` (including `DataFrame`s)") | ||
|
||
val in = Seq( ("foo", "bar")).toDF( "foo", "bar") | ||
|
||
Console.withOut( s) { | ||
in.show(numRows= 1, truncate= 0, vertical= true) | ||
} | ||
// info( s.toString) | ||
s.toString.linesIterator.foreach( info(_)) | ||
s.reset | ||
|
||
When( "a `NamedTransformation` is applied") | ||
|
||
val out = in.transformWithDescription( nt) | ||
|
||
// val s = new ByteArrayOutputStream | ||
Console.withOut( s) { | ||
out.show(numRows= 1, truncate= 0, vertical= true) | ||
} | ||
// info( s.toString) | ||
s.toString.linesIterator.foreach( info(_)) | ||
|
||
|
||
|
||
Then( "the resulting Spark jobs have a matching description (pending)") | ||
|
||
// info( s"""spark.jobGroup.id: ${out.sparkSession.sparkContext.getLocalProperty( "spark.jobGroup.id")}""") | ||
|
||
val sjd = out.sparkSession.sparkContext.getLocalProperty( "spark.job.description") | ||
|
||
info( s"spark.job.description: ${sjd}") | ||
|
||
assert( sjd === "NamedTransformation nt") | ||
|
||
// info( s"""spark.callSite.short: ${out.sparkSession.sparkContext.getLocalProperty( "spark.callSite.short")}""") | ||
// info( s"""spark.callSite.long: ${out.sparkSession.sparkContext.getLocalProperty( "spark.callSite.long")}""") | ||
|
||
|
||
|
||
|
||
|
||
|
||
And( "the result of the transformation is correct") | ||
|
||
assertResult( "`foo` STRING") { | ||
out.schema.toDDL | ||
} | ||
|
||
assertResult( "foo") { | ||
out.first.getString(0) | ||
} | ||
|
||
|
||
} | ||
|
||
|
||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@neilbest-db This is not a licensed module. So we can't use it as per databrickslab standard.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@souravbaner-da, I ust noticed your comment from 2024-06-05. I discussed this with @sriram251-code here in Slack back on 2024-06-04. I believe we have discussed it on >= 1 team calls since then also. If it's still a concern I would like to learn more about the standard you mention. Is that in a document somewhere?