Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark events bronze par session #678

Merged
merged 14 commits into from
Feb 3, 2023
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name := "overwatch"

organization := "com.databricks.labs"

version := "0.7.1.0"
version := "0.7.1.1"

scalaVersion := "2.12.12"
scalacOptions ++= Seq("-Xmax-classfile-name", "78")
Expand Down
12 changes: 2 additions & 10 deletions src/main/scala/com/databricks/labs/overwatch/ApiCallV2.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,12 @@ object ApiCallV2 extends SparkSessionWrapper {
* @param accumulator To make track of number of api request.
* @return
*/
def apply(apiEnv: ApiEnv, apiName: String, queryMap: Map[String, String], tempSuccessPath: String, accumulator: LongAccumulator): ApiCallV2 = {
def apply(apiEnv: ApiEnv, apiName: String, queryMap: Map[String, String], tempSuccessPath: String): ApiCallV2 = {
new ApiCallV2(apiEnv)
.setEndPoint(apiName)
.buildMeta(apiName)
.setQueryMap(queryMap)
.setSuccessTempPath(tempSuccessPath)
.setAccumulator(accumulator)
}

/**
Expand Down Expand Up @@ -125,9 +124,7 @@ class ApiCallV2(apiEnv: ApiEnv) extends SparkSessionWrapper {
private var _apiFailureCount: Int = 0
private var _printFinalStatusFlag: Boolean = true
private var _queryMap: Map[String, String] = Map[String, String]()
private var _accumulator: LongAccumulator = sc.longAccumulator("ApiAccumulator") //Multithreaded call accumulator will make track of the request.

protected def accumulator: LongAccumulator = _accumulator
protected def apiSuccessCount: Int = _apiSuccessCount

protected def apiFailureCount: Int = _apiFailureCount
Expand All @@ -148,10 +145,6 @@ class ApiCallV2(apiEnv: ApiEnv) extends SparkSessionWrapper {

protected def queryMap: Map[String, String] = _queryMap

private[overwatch] def setAccumulator(value: LongAccumulator): this.type = {
_accumulator = value
this
}

private[overwatch] def setApiV(value: Double): this.type = {
apiMeta.setApiV("api/"+value)
Expand Down Expand Up @@ -549,7 +542,7 @@ class ApiCallV2(apiEnv: ApiEnv) extends SparkSessionWrapper {
* Performs api calls in parallel.
* @return
*/
def executeMultiThread(): util.ArrayList[String] = {
def executeMultiThread(accumulator: LongAccumulator): util.ArrayList[String] = {
@tailrec def executeThreadedHelper(): util.ArrayList[String] = {
val response = getResponse
responseCodeHandler(response)
Expand Down Expand Up @@ -607,7 +600,6 @@ class ApiCallV2(apiEnv: ApiEnv) extends SparkSessionWrapper {
responseCodeHandler(response)
_apiResponseArray.add(response.body)
if (apiMeta.storeInTempLocation && successTempPath.nonEmpty) {
accumulator.add(1)
GeekSheikh marked this conversation as resolved.
Show resolved Hide resolved
if (apiEnv.successBatchSize <= _apiResponseArray.size()) { //Checking if its right time to write the batches into persistent storage
val responseFlag = PipelineFunctions.writeMicroBatchToTempLocation(successTempPath.get, _apiResponseArray.toString)
if (responseFlag) { //Clearing the resultArray in-case of successful write
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import org.apache.log4j.{Level, Logger}
object BatchRunner extends SparkSessionWrapper {

private val logger: Logger = Logger.getLogger(this.getClass)
SparkSessionWrapper.sessionsMap.clear()
SparkSessionWrapper.globalTableLock.clear()

private def setGlobalDeltaOverrides(): Unit = {
spark.conf.set("spark.databricks.delta.optimize.maxFileSize", 1024 * 1024 * 128)
Expand Down Expand Up @@ -68,7 +70,6 @@ object BatchRunner extends SparkSessionWrapper {
Gold(workspace).run()
}


}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.databricks.labs.overwatch

import com.databricks.labs.overwatch.pipeline.TransformFunctions._
import com.databricks.labs.overwatch.pipeline._
import com.databricks.labs.overwatch.utils.SparkSessionWrapper.parSessionsOn
import com.databricks.labs.overwatch.utils._
import com.databricks.labs.overwatch.validation.DeploymentValidation
import org.apache.log4j.{Level, Logger}
Expand Down Expand Up @@ -191,6 +192,8 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper {
fullMsg,
Some(multiWorkspaceParams.deploymentId)
))
} finally {
clearThreadFromSessionsMap()
}
}

Expand All @@ -215,6 +218,8 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper {
fullMsg,
Some(multiWorkspaceParams.deploymentId)
))
} finally {
clearThreadFromSessionsMap()
}
}

Expand All @@ -239,6 +244,8 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper {
fullMsg,
Some(multiWorkspaceParams.deploymentId)
))
}finally {
clearThreadFromSessionsMap()
}
}

Expand Down Expand Up @@ -355,42 +362,54 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper {
*/
def deploy(parallelism: Int = 4, zones: String = "Bronze,Silver,Gold"): Unit = {
val processingStartTime = System.currentTimeMillis();
println("ParallelismLevel :" + parallelism)

val multiWorkspaceConfig = generateMultiWorkspaceConfig(configCsvPath, deploymentId, outputPath)
snapshotConfig(multiWorkspaceConfig)
val params = DeploymentValidation
.performMandatoryValidation(multiWorkspaceConfig, parallelism)
.map(buildParams)

println("Workspace to be Deployed :" + params.size)
val zoneArray = zones.split(",")
zoneArray.foreach(zone => {
val responseCounter = Collections.synchronizedList(new util.ArrayList[Int]())
implicit val ec: ExecutionContextExecutor = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(parallelism))
params.foreach(deploymentParams => {
val future = Future {
zone.toLowerCase match {
case "bronze" =>
startBronzeDeployment(deploymentParams)
case "silver" =>
startSilverDeployment(deploymentParams)
case "gold" =>
startGoldDeployment(deploymentParams)
try {
if (parallelism > 1) SparkSessionWrapper.parSessionsOn = true
SparkSessionWrapper.sessionsMap.clear()
SparkSessionWrapper.globalTableLock.clear()

// initialize spark overrides for global spark conf
PipelineFunctions.setSparkOverrides(spark(globalSession = true), SparkSessionWrapper.globalSparkConfOverrides)

println("ParallelismLevel :" + parallelism)
val multiWorkspaceConfig = generateMultiWorkspaceConfig(configCsvPath, deploymentId, outputPath)
snapshotConfig(multiWorkspaceConfig)
val params = DeploymentValidation
.performMandatoryValidation(multiWorkspaceConfig, parallelism)
.map(buildParams)
println("Workspace to be Deployed :" + params.size)

val zoneArray = zones.split(",")
zoneArray.foreach(zone => {
val responseCounter = Collections.synchronizedList(new util.ArrayList[Int]())
implicit val ec: ExecutionContextExecutor = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(parallelism))
params.foreach(deploymentParams => {
val future = Future {
zone.toLowerCase match {
case "bronze" =>
startBronzeDeployment(deploymentParams)
case "silver" =>
startSilverDeployment(deploymentParams)
case "gold" =>
startGoldDeployment(deploymentParams)
}
}
}
future.onComplete {
case _ =>
responseCounter.add(1)
future.onComplete {
case _ =>
responseCounter.add(1)
}
})
while (responseCounter.size() < params.length) {
Thread.sleep(5000)
}
})
while (responseCounter.size() < params.length) {
Thread.sleep(5000)
}
})
saveDeploymentReport(deploymentReport, multiWorkspaceConfig.head.etl_storage_prefix, "deploymentReport")
saveDeploymentReport(deploymentReport, multiWorkspaceConfig.head.etl_storage_prefix, "deploymentReport")
} catch {
case e: Exception => throw e
} finally {
SparkSessionWrapper.sessionsMap.clear()
SparkSessionWrapper.globalTableLock.clear()
}
println(s"""Deployment completed in sec ${(System.currentTimeMillis() - processingStartTime) / 1000}""")

}

/**
Expand Down
Loading