Skip to content

Commit

Permalink
hold
Browse files Browse the repository at this point in the history
  • Loading branch information
GeekSheikh committed Jan 12, 2023
1 parent c9dee5c commit 2c04d39
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper {
organizationID = Some(multiWorkspaceParams.workspaceId))

Bronze(workspace).run()
Bronze(workspace).restoreSparkConf()
println(s"""************Bronze Deployment Completed workspaceID:${multiWorkspaceParams.workspaceId}************ """)
deploymentReport.append(MultiWSDeploymentReport(multiWorkspaceParams.workspaceId, "Bronze", Some(multiWorkspaceParams.args),
"SUCCESS",
Expand Down Expand Up @@ -373,6 +374,10 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper {
// if (parallelism > 1)
SparkSessionWrapper.parSessionsOn = true
SparkSessionWrapper.sessionsMap.clear()

// initialize spark overrides for global spark conf
PipelineFunctions.setSparkOverrides(spark(globalSession = true), Map[String, String]())

val zoneArray = zones.split(",")
zoneArray.foreach(zone => {
val responseCounter = Collections.synchronizedList(new util.ArrayList[Int]())
Expand Down
17 changes: 10 additions & 7 deletions src/main/scala/com/databricks/labs/overwatch/env/Database.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package com.databricks.labs.overwatch.env

import com.databricks.labs.overwatch.pipeline.TransformFunctions._
import com.databricks.labs.overwatch.pipeline.{PipelineFunctions, PipelineTable}
import com.databricks.labs.overwatch.utils.{Config, SparkSessionWrapper, WriteMode}
import com.databricks.labs.overwatch.utils.{Config, JsonUtils, SparkSessionWrapper, WriteMode}
import io.delta.tables.DeltaTable
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.functions.{lit, log}
import org.apache.spark.sql.streaming.StreamingQueryListener._
import org.apache.spark.sql.streaming.{DataStreamWriter, StreamingQuery, StreamingQueryListener}
import org.apache.spark.sql.{Column, DataFrame, DataFrameWriter, Row}
Expand Down Expand Up @@ -180,7 +180,11 @@ class Database(config: Config) extends SparkSessionWrapper {
finalSourceDF = if (!target.permitDuplicateKeys) finalSourceDF.dedupByKey(target.keys, target.incrementalColumns) else finalSourceDF

val finalDF = if (target.persistBeforeWrite) persistAndLoad(finalSourceDF, target) else finalSourceDF

val sparkGlobalConf = JsonUtils.objToJson(spark(globalSession = true).conf.getAll).compactString
val sparkLocalSessionConf = JsonUtils.objToJson(spark.conf.getAll).compactString
logger.log(Level.INFO, s"SPARK GLOBAL CONF:\n$sparkGlobalConf")
logger.log(Level.INFO, s"SPARK LOCAL CONF:\n$sparkLocalSessionConf")
logger.log(Level.INFO, s"SPARK LOCAL HASH CODE BEFORE WRITE: ${spark.hashCode()} for thread ${Thread.currentThread().getId}")
// ON FIRST RUN - WriteMode is automatically overwritten to APPEND
if (target.writeMode == WriteMode.merge) { // DELTA MERGE / UPSERT
val deltaTarget = DeltaTable.forPath(target.tableLocation).alias("target")
Expand All @@ -203,6 +207,9 @@ class Database(config: Config) extends SparkSessionWrapper {
|""".stripMargin
logger.log(Level.INFO, mergeDetailMsg)
spark.conf.set("spark.databricks.delta.commitInfo.userMetadata", config.runID)
logger.log(Level.INFO, s"Delta Schema AutoMerge Conf ORIGINAL: ${spark.conf.get("spark.databricks.delta.schema.autoMerge.enabled")}")
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
logger.log(Level.INFO, s"Delta Schema AutoMerge Conf FORCED: ${spark.conf.get("spark.databricks.delta.schema.autoMerge.enabled")}")
// TODO -- when DBR 9.1 LTS GA, use LSM (low-shuffle-merge) to improve pipeline
deltaTarget
.merge(updatesDF, mergeCondition)
Expand Down Expand Up @@ -310,10 +317,6 @@ class Database(config: Config) extends SparkSessionWrapper {
df.persist()
} else df
if (needsCache) inputDf.count()
spark.sql(
s"""
|refresh table ${target.tableFullName}
|""".stripMargin)
performRetry(inputDf,target, pipelineSnapTime, maxMergeScanDates)
true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,14 +568,15 @@ object Initializer extends SparkSessionWrapper {
val config = new Config()
if(organizationID.isEmpty) {
config.setOrganizationId(getOrgId)
}else{
}else{ // is multiWorkspace deployment since orgID is passed
logger.log(Level.INFO, "Setting multiworkspace deployment")
config.setOrganizationId(organizationID.get)
if (apiUrl.nonEmpty) {
config.setApiUrl(apiUrl)
}
config.setIsMultiworkspaceDeployment(true)
}
// set spark overrides in scoped spark session
config.registerInitialSparkConf(spark.conf.getAll)
config.setInitialWorkerCount(getNumberOfWorkerNodes)
config.setInitialShuffleParts(spark.conf.get("spark.sql.shuffle.partitions").toInt)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.databricks.labs.overwatch.pipeline

import com.databricks.labs.overwatch.env.{Database, Workspace}
import com.databricks.labs.overwatch.utils.{Config, OverwatchScope}
import org.apache.log4j.Logger
import com.databricks.labs.overwatch.utils.{Config, JsonUtils, OverwatchScope}
import org.apache.log4j.{Level, Logger}

class Silver(_workspace: Workspace, _database: Database, _config: Config)
extends Pipeline(_workspace, _database, _config)
Expand Down Expand Up @@ -361,6 +361,9 @@ class Silver(_workspace: Workspace, _database: Database, _config: Config)
def run(): Pipeline = {

restoreSparkConf()
val sparkLocalSessionConf = JsonUtils.objToJson(spark.conf.getAll).compactString
logger.log(Level.INFO, s"SPARK LOCAL CONF AFTER CONF INITIALIZED:\n$sparkLocalSessionConf")
logger.log(Level.INFO, s"SPARK LOCAL HASH CODE AFTER CONF INITIALIZED: ${spark.hashCode()} for thread ${Thread.currentThread().getId}")
executeModules()
initiatePostProcessing()
this // to be used as fail switch later if necessary
Expand Down
10 changes: 2 additions & 8 deletions src/main/scala/com/databricks/labs/overwatch/utils/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,9 @@ class Config() {
"spark.databricks.delta.optimizeWrite.numShuffleBlocks" ->
value.getOrElse("spark.databricks.delta.optimizeWrite.numShuffleBlocks", "50000"),
"spark.databricks.delta.optimizeWrite.binSize" ->
value.getOrElse("spark.databricks.delta.optimizeWrite.binSize", "512"),
"spark.sql.shuffle.partitions" -> "400", // allow aqe to shrink
"spark.sql.caseSensitive" -> "false",
"spark.sql.autoBroadcastJoinThreshold" -> "10485760",
"spark.sql.adaptive.autoBroadcastJoinThreshold" -> "10485760",
"spark.databricks.delta.schema.autoMerge.enabled" -> "true",
"spark.sql.optimizer.collapseProjectAlwaysInline" -> "true" // temporary workaround ES-318365
value.getOrElse("spark.databricks.delta.optimizeWrite.binSize", "512")
)
_initialSparkConf = value ++ manualOverrides
_initialSparkConf = value ++ manualOverrides ++ SparkSessionWrapper.globalSparkConfOverrides
this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ object SparkSessionWrapper {

var parSessionsOn = false
private[overwatch] val sessionsMap = new ConcurrentHashMap[Long, SparkSession]().asScala
private[overwatch] val globalSparkConfOverrides = Map(
"spark.sql.shuffle.partitions" -> "400", // allow aqe to shrink
"spark.sql.caseSensitive" -> "false",
"spark.sql.autoBroadcastJoinThreshold" -> "10485760",
"spark.sql.adaptive.autoBroadcastJoinThreshold" -> "10485760",
"spark.databricks.delta.schema.autoMerge.enabled" -> "true",
"spark.sql.optimizer.collapseProjectAlwaysInline" -> "true" // temporary workaround ES-318365
)

}

Expand Down Expand Up @@ -67,8 +75,8 @@ trait SparkSessionWrapper extends Serializable {
buildSpark()
}
}
@transient
lazy val spark:SparkSession = spark(false)

@transient lazy val spark:SparkSession = spark(false)

lazy val sc: SparkContext = spark.sparkContext
// sc.setLogLevel("WARN")
Expand Down

0 comments on commit 2c04d39

Please sign in to comment.