Skip to content

Commit

Permalink
parSession additional Logs
Browse files Browse the repository at this point in the history
  • Loading branch information
GeekSheikh committed Jan 11, 2023
1 parent 01e2192 commit c9dee5c
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,9 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper {
.performMandatoryValidation(multiWorkspaceConfig, parallelism)
.map(buildParams)
println("Workspace to be Deployed :" + params.size)
// if (parallelism > 1)
SparkSessionWrapper.parSessionsOn = true
SparkSessionWrapper.sessionsMap.clear()
val zoneArray = zones.split(",")
zoneArray.foreach(zone => {
val responseCounter = Collections.synchronizedList(new util.ArrayList[Int]())
Expand Down
22 changes: 19 additions & 3 deletions src/main/scala/com/databricks/labs/overwatch/env/Database.scala
Original file line number Diff line number Diff line change
Expand Up @@ -264,12 +264,23 @@ class Database(config: Config) extends SparkSessionWrapper {
maxMergeScanDates: Array[String] = Array()): this.type = {
@tailrec def executeRetry(retryCount: Int): this.type = {
val rerunFlag = try {
logger.log(Level.INFO, s"EXECUTING WRITE FOR THREAD ${Thread.currentThread().getId} attempt #: $retryCount\n " +
s"Delta Schema AutoMerge Conf: ${spark.conf.get("spark.databricks.delta.schema.autoMerge.enabled")}")
write(inputDf, target, pipelineSnapTime, maxMergeScanDates)
false
} catch {
case e: Throwable =>
val exceptionMsg = e.getMessage.toLowerCase()
if (exceptionMsg != null && (exceptionMsg.contains("concurrent") || exceptionMsg.contains("conflicting")) && retryCount < 5) {
logger.log(Level.WARN,
s"""
|DELTA Table Write Failure:
|$exceptionMsg
|Attempting Retry
|""".stripMargin)
val concurrentWriteFailure = exceptionMsg.contains("concurrent") ||
exceptionMsg.contains("conflicting") ||
exceptionMsg.contains("all nested columns must match")
if (exceptionMsg != null && concurrentWriteFailure && retryCount < 5) {
coolDown(target.tableFullName)
true
} else {
Expand Down Expand Up @@ -299,6 +310,10 @@ class Database(config: Config) extends SparkSessionWrapper {
df.persist()
} else df
if (needsCache) inputDf.count()
spark.sql(
s"""
|refresh table ${target.tableFullName}
|""".stripMargin)
performRetry(inputDf,target, pipelineSnapTime, maxMergeScanDates)
true
}
Expand All @@ -309,8 +324,9 @@ class Database(config: Config) extends SparkSessionWrapper {
*/
private def coolDown(tableName: String): Unit = {
val rnd = new scala.util.Random
val number:Long = (rnd.nextFloat() * 30 + 30).toLong*1000
logger.log(Level.INFO,"Slowing multithreaded writing for " + tableName + "sleeping..." + number+" thread name "+Thread.currentThread().getName)
val number:Long = ((rnd.nextFloat() * 30) + 30 + (rnd.nextFloat() * 30)).toLong*1000
logger.log(Level.INFO,"DELTA WRITE COOLDOWN: Slowing multithreaded writing for " +
tableName + "sleeping..." + number + " thread name " + Thread.currentThread().getName)
Thread.sleep(number)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,17 @@ trait SparkSessionWrapper extends Serializable {
*/
private[overwatch] def spark(globalSession : Boolean = false): SparkSession = {

logger.log(Level.INFO,
s"""
|Getting Spark for Thread ${Thread.currentThread().getId}
|SessionMapSize: ${sessionsMap.size}
|""".stripMargin)
if(SparkSessionWrapper.parSessionsOn){
if(globalSession){
buildSpark()
}
else{
val currentThreadID = Thread.currentThread().getId()
val currentThreadID = Thread.currentThread().getId
val sparkSession = sessionsMap.getOrElse(currentThreadID, buildSpark().newSession())
sessionsMap.put(currentThreadID, sparkSession)
sparkSession
Expand Down

0 comments on commit c9dee5c

Please sign in to comment.