Skip to content

Commit

Permalink
Add extension method to show DataFrame records in the log
Browse files Browse the repository at this point in the history
  • Loading branch information
neilbest-db committed May 23, 2024
1 parent da0c55a commit 7158765
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Column, DataFrame}


trait SilverTransforms extends SparkSessionWrapper {
trait SilverTransforms extends SparkSessionWrapper with DataFrameSyntax {

import spark.implicits._

Expand Down Expand Up @@ -1313,6 +1313,15 @@ trait SilverTransforms extends SparkSessionWrapper {
// eagerly force this highly reused DF into cache()
jobRunsLag30D.count()


// TODO: remove or comment out or change log level or . . .

logger.log( Level.INFO, "Showing first 5 rows of `jobRunsLag30D`:")

jobRunsLag30D
.showLines(5, 20, true)
.foreach( logger.log( Level.INFO, _))

// Lookup to populate the clusterID/clusterName where missing from jobs
lazy val clusterSpecNameLookup = clusterSpec.asDF
.select('organization_id, 'timestamp, 'cluster_name, 'cluster_id.alias("clusterId"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.databricks.labs.overwatch.utils

import org.apache.log4j.{Level,Logger}
import org.apache.spark.sql.Dataset
import java.io.ByteArrayOutputStream

trait DataFrameSyntax {

/**
* Implements extension methods for Spark `Dataset[T]`s (including
* `DataFrame`s) that mimic Spark's built-in `Dataset.show()`
* family of overloaded methods.
*
* Instead of producing output on the console, they return an
* `Iterator[String]` suitable for redirecting output to a logger,
* for example, like this:
*
* {{{
* debugDF
* .showLines( debugDF.count.toInt, 0, true)
* .foreach( logger.log( Level.INFO, _))
* }}}
*
* Produces log entries according to your logger configuration:
*
* {{{
* 24/05/04 00:18:16 INFO Silver: -RECORD 0---------------------------------
* 24/05/04 00:18:16 INFO Silver: organization_id | 2753962522174656
* 24/05/04 00:18:16 INFO Silver: jobId | 903015066329560
* 24/05/04 00:18:16 INFO Silver: fromMS | 1709419775381
* . . .
* 24/05/04 00:18:16 INFO Silver: -RECORD 1---------------------------------
* . . .
* }}}
*/

implicit class DataFrameShower[T]( val df: Dataset[T]) {

def showLines(): Iterator[String] =
showLines( 20)

def showLines( numRows: Int): Iterator[String] =
showLines( numRows, truncate = true)

def showLines( truncate: Boolean): Iterator[String] =
showLines( 20, truncate)

def showLines( numRows: Int, truncate: Boolean): Iterator[String] =
if( truncate) {
showLines( numRows, truncate = 20)
} else {
showLines( numRows, truncate = 0)
}

def showLines( numRows: Int, truncate: Int): Iterator[String] =
showLines( numRows, truncate, vertical = false)

def showLines( numRows: Int, truncate: Int, vertical: Boolean): Iterator[String] = {
val out = new ByteArrayOutputStream
Console.withOut( out) {
df.show( numRows, truncate, vertical) }
val lines = out.toString.linesIterator
out.reset()
lines

}
}
}

0 comments on commit 7158765

Please sign in to comment.