Skip to content

Commit

Permalink
Spark events bronze par session (#678)
Browse files Browse the repository at this point in the history
* initial commit

* Test Spark per session

* notes from meeting

* persession implemented

* persession implemented

* persession implemented

* persession implemented

* persession implemented

* persession implemented

* pr review implemented

* 686 - SparkEvents Executor ID Schema Handler

* global Session - initialize configs

* Concurrent Writes - Table Lock for Parallelized Loads (#691)

* initi commit -- working

* code modified preWriteActions added

* re-instanted perform retry for legacy deployments and added comments

* added logging details

* clear sessionsMap on batch runner

---------

Co-authored-by: sriram251-code <sriram.mohanty@databricks.com>

* minor fixes

---------

Co-authored-by: Daniel Tomes <10840635+GeekSheikh@users.noreply.github.com>
Co-authored-by: geeksheikh <geeksheikh@users.noreply.github.com>
  • Loading branch information
3 people committed Feb 3, 2023
1 parent 5806e44 commit 3903d27
Show file tree
Hide file tree
Showing 11 changed files with 282 additions and 119 deletions.
12 changes: 2 additions & 10 deletions src/main/scala/com/databricks/labs/overwatch/ApiCallV2.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,12 @@ object ApiCallV2 extends SparkSessionWrapper {
* @param accumulator To make track of number of api request.
* @return
*/
def apply(apiEnv: ApiEnv, apiName: String, queryMap: Map[String, String], tempSuccessPath: String, accumulator: LongAccumulator): ApiCallV2 = {
def apply(apiEnv: ApiEnv, apiName: String, queryMap: Map[String, String], tempSuccessPath: String): ApiCallV2 = {
new ApiCallV2(apiEnv)
.setEndPoint(apiName)
.buildMeta(apiName)
.setQueryMap(queryMap)
.setSuccessTempPath(tempSuccessPath)
.setAccumulator(accumulator)
}

/**
Expand Down Expand Up @@ -125,9 +124,7 @@ class ApiCallV2(apiEnv: ApiEnv) extends SparkSessionWrapper {
private var _apiFailureCount: Int = 0
private var _printFinalStatusFlag: Boolean = true
private var _queryMap: Map[String, String] = Map[String, String]()
private var _accumulator: LongAccumulator = sc.longAccumulator("ApiAccumulator") //Multithreaded call accumulator will make track of the request.

protected def accumulator: LongAccumulator = _accumulator
protected def apiSuccessCount: Int = _apiSuccessCount

protected def apiFailureCount: Int = _apiFailureCount
Expand All @@ -148,10 +145,6 @@ class ApiCallV2(apiEnv: ApiEnv) extends SparkSessionWrapper {

protected def queryMap: Map[String, String] = _queryMap

private[overwatch] def setAccumulator(value: LongAccumulator): this.type = {
_accumulator = value
this
}

private[overwatch] def setApiV(value: Double): this.type = {
apiMeta.setApiV("api/"+value)
Expand Down Expand Up @@ -549,7 +542,7 @@ class ApiCallV2(apiEnv: ApiEnv) extends SparkSessionWrapper {
* Performs api calls in parallel.
* @return
*/
def executeMultiThread(): util.ArrayList[String] = {
def executeMultiThread(accumulator: LongAccumulator): util.ArrayList[String] = {
@tailrec def executeThreadedHelper(): util.ArrayList[String] = {
val response = getResponse
responseCodeHandler(response)
Expand Down Expand Up @@ -607,7 +600,6 @@ class ApiCallV2(apiEnv: ApiEnv) extends SparkSessionWrapper {
responseCodeHandler(response)
_apiResponseArray.add(response.body)
if (apiMeta.storeInTempLocation && successTempPath.nonEmpty) {
accumulator.add(1)
if (apiEnv.successBatchSize <= _apiResponseArray.size()) { //Checking if its right time to write the batches into persistent storage
val responseFlag = PipelineFunctions.writeMicroBatchToTempLocation(successTempPath.get, _apiResponseArray.toString)
if (responseFlag) { //Clearing the resultArray in-case of successful write
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import org.apache.log4j.{Level, Logger}
object BatchRunner extends SparkSessionWrapper {

private val logger: Logger = Logger.getLogger(this.getClass)
SparkSessionWrapper.sessionsMap.clear()
SparkSessionWrapper.globalTableLock.clear()

private def setGlobalDeltaOverrides(): Unit = {
spark.conf.set("spark.databricks.delta.optimize.maxFileSize", 1024 * 1024 * 128)
Expand Down Expand Up @@ -68,7 +70,6 @@ object BatchRunner extends SparkSessionWrapper {
Gold(workspace).run()
}


}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.databricks.labs.overwatch

import com.databricks.labs.overwatch.pipeline.TransformFunctions._
import com.databricks.labs.overwatch.pipeline._
import com.databricks.labs.overwatch.utils.SparkSessionWrapper.parSessionsOn
import com.databricks.labs.overwatch.utils._
import com.databricks.labs.overwatch.validation.DeploymentValidation
import org.apache.log4j.{Level, Logger}
Expand Down Expand Up @@ -191,6 +192,8 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper {
fullMsg,
Some(multiWorkspaceParams.deploymentId)
))
} finally {
clearThreadFromSessionsMap()
}
}

Expand All @@ -215,6 +218,8 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper {
fullMsg,
Some(multiWorkspaceParams.deploymentId)
))
} finally {
clearThreadFromSessionsMap()
}
}

Expand All @@ -239,6 +244,8 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper {
fullMsg,
Some(multiWorkspaceParams.deploymentId)
))
}finally {
clearThreadFromSessionsMap()
}
}

Expand Down Expand Up @@ -355,42 +362,54 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper {
*/
def deploy(parallelism: Int = 4, zones: String = "Bronze,Silver,Gold"): Unit = {
val processingStartTime = System.currentTimeMillis();
println("ParallelismLevel :" + parallelism)

val multiWorkspaceConfig = generateMultiWorkspaceConfig(configCsvPath, deploymentId, outputPath)
snapshotConfig(multiWorkspaceConfig)
val params = DeploymentValidation
.performMandatoryValidation(multiWorkspaceConfig, parallelism)
.map(buildParams)

println("Workspace to be Deployed :" + params.size)
val zoneArray = zones.split(",")
zoneArray.foreach(zone => {
val responseCounter = Collections.synchronizedList(new util.ArrayList[Int]())
implicit val ec: ExecutionContextExecutor = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(parallelism))
params.foreach(deploymentParams => {
val future = Future {
zone.toLowerCase match {
case "bronze" =>
startBronzeDeployment(deploymentParams)
case "silver" =>
startSilverDeployment(deploymentParams)
case "gold" =>
startGoldDeployment(deploymentParams)
try {
if (parallelism > 1) SparkSessionWrapper.parSessionsOn = true
SparkSessionWrapper.sessionsMap.clear()
SparkSessionWrapper.globalTableLock.clear()

// initialize spark overrides for global spark conf
PipelineFunctions.setSparkOverrides(spark(globalSession = true), SparkSessionWrapper.globalSparkConfOverrides)

println("ParallelismLevel :" + parallelism)
val multiWorkspaceConfig = generateMultiWorkspaceConfig(configCsvPath, deploymentId, outputPath)
snapshotConfig(multiWorkspaceConfig)
val params = DeploymentValidation
.performMandatoryValidation(multiWorkspaceConfig, parallelism)
.map(buildParams)
println("Workspace to be Deployed :" + params.size)

val zoneArray = zones.split(",")
zoneArray.foreach(zone => {
val responseCounter = Collections.synchronizedList(new util.ArrayList[Int]())
implicit val ec: ExecutionContextExecutor = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(parallelism))
params.foreach(deploymentParams => {
val future = Future {
zone.toLowerCase match {
case "bronze" =>
startBronzeDeployment(deploymentParams)
case "silver" =>
startSilverDeployment(deploymentParams)
case "gold" =>
startGoldDeployment(deploymentParams)
}
}
}
future.onComplete {
case _ =>
responseCounter.add(1)
future.onComplete {
case _ =>
responseCounter.add(1)
}
})
while (responseCounter.size() < params.length) {
Thread.sleep(5000)
}
})
while (responseCounter.size() < params.length) {
Thread.sleep(5000)
}
})
saveDeploymentReport(deploymentReport, multiWorkspaceConfig.head.etl_storage_prefix, "deploymentReport")
saveDeploymentReport(deploymentReport, multiWorkspaceConfig.head.etl_storage_prefix, "deploymentReport")
} catch {
case e: Exception => throw e
} finally {
SparkSessionWrapper.sessionsMap.clear()
SparkSessionWrapper.globalTableLock.clear()
}
println(s"""Deployment completed in sec ${(System.currentTimeMillis() - processingStartTime) / 1000}""")

}

/**
Expand Down
Loading

0 comments on commit 3903d27

Please sign in to comment.