Skip to content

Commit

Permalink
pr review implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
sriram251-code committed Jan 6, 2023
1 parent 1de9581 commit ed8c521
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,7 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper {
Some(multiWorkspaceParams.deploymentId)
))
} finally {
SparkSessionWrapper.sessionsMap.remove(Thread.currentThread().getId)
logger.log(Level.INFO, s"""Removed ${Thread.currentThread().getId} from sessionMap""")
clearThreadFromSessionsMap()
}
}

Expand All @@ -220,8 +219,7 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper {
Some(multiWorkspaceParams.deploymentId)
))
} finally {
SparkSessionWrapper.sessionsMap.remove(Thread.currentThread().getId)
logger.log(Level.INFO, s"""Removed ${Thread.currentThread().getId} from sessionMap""")
clearThreadFromSessionsMap()
}
}

Expand All @@ -247,8 +245,7 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper {
Some(multiWorkspaceParams.deploymentId)
))
}finally {
SparkSessionWrapper.sessionsMap.remove(Thread.currentThread().getId)
logger.log(Level.INFO, s"""Removed ${Thread.currentThread().getId} from sessionMap""")
clearThreadFromSessionsMap()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,7 @@ trait BronzeTransforms extends SparkSessionWrapper {
val pathsGlob = validNewFilesWMetaDF
.filter(!'failed && 'withinSpecifiedTimeRange)
.orderBy('fileSize.desc)
.select('filename)
.select('fileName)
.as[String].collect
if (pathsGlob.nonEmpty) { // new files less bad files and already-processed files
logger.log(Level.INFO, s"VALID NEW EVENT LOGS FOUND: COUNT --> ${pathsGlob.length}")
Expand Down Expand Up @@ -798,7 +798,6 @@ trait BronzeTransforms extends SparkSessionWrapper {
TransformFunctions.stringTsToUnixMillis('timestamp)
} else col("Timestamp")

logger.log(Level.INFO, s"DEBUG: SparkEventsBronze Case Sensitivity set to: ${spark.conf.get("spark.sql.caseSensitive")}")
baseDF
.withColumn("Timestamp", fixDupTimestamps)
.drop("timestamp")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ class Pipeline(
postProcessor.refreshPipReportView(pipelineStateViewTarget)
//TODO clearcache will clear global cache multithread performance issue
// spark.catalog.clearCache()
SparkSessionWrapper.sessionsMap.remove(Thread.currentThread().getId)
clearThreadFromSessionsMap()
}

private[overwatch] def restoreSparkConf(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@ trait SparkSessionWrapper extends Serializable {
* Init environment. This structure alows for multiple calls to "reinit" the environment. Important in the case of
* autoscaling. When the cluster scales up/down envInit and then check for current cluster cores.
*/
@transient
lazy protected val _envInit: Boolean = envInit()


protected def buildSpark(): SparkSession = {
private def buildSpark(): SparkSession = {
sessionsMap.hashCode()
SparkSession
.builder()
Expand All @@ -47,7 +46,7 @@ trait SparkSessionWrapper extends Serializable {
* behavior differently to work in remote execution AND/OR local only mode but local only mode
* requires some additional setup.
*/
def spark(globalSession : Boolean = false): SparkSession = {
private[overwatch] def spark(globalSession : Boolean = false): SparkSession = {

if(SparkSessionWrapper.parSessionsOn){
if(globalSession){
Expand All @@ -69,6 +68,11 @@ trait SparkSessionWrapper extends Serializable {
lazy val sc: SparkContext = spark.sparkContext
// sc.setLogLevel("WARN")

def clearThreadFromSessionsMap(): Unit ={
sessionsMap.remove(Thread.currentThread().getId)
logger.log(Level.INFO, s"""Removed ${Thread.currentThread().getId} from sessionMap""")
}

def getCoresPerWorker: Int = sc.parallelize("1", 1)
.map(_ => java.lang.Runtime.getRuntime.availableProcessors).collect()(0)

Expand Down

0 comments on commit ed8c521

Please sign in to comment.