Skip to content

Commit

Permalink
0711 Release Staging (#675)
Browse files Browse the repository at this point in the history
* initial commit

* Changing Serverless to High-Concurrency (#706)

* Changing Serverless to High-Concurrency

* minor changes

---------

Co-authored-by: Sourav Banerjee <30810740+Sourav692@users.noreply.github.com>
Co-authored-by: Daniel Tomes <10840635+GeekSheikh@users.noreply.github.com>

* Spark events bronze par session (#678)

* initial commit

* Test Spark per session

* notes from meeting

* persession implemented

* persession implemented

* persession implemented

* persession implemented

* persession implemented

* persession implemented

* pr review implemented

* 686 - SparkEvents Executor ID Schema Handler

* global Session - initialize configs

* Concurrent Writes - Table Lock for Parallelized Loads (#691)

* initi commit -- working

* code modified preWriteActions added

* re-instanted perform retry for legacy deployments and added comments

* added logging details

* clear sessionsMap on batch runner

---------

Co-authored-by: sriram251-code <sriram.mohanty@databricks.com>

* minor fixes

---------

Co-authored-by: Daniel Tomes <10840635+GeekSheikh@users.noreply.github.com>
Co-authored-by: geeksheikh <geeksheikh@users.noreply.github.com>

* adding new snapshots to bronze layer (#684)

* adding new snapshots to bronze layer

* changed all the single asset name to plural

* adding transform function for new bronze snaps

* changes applied to improve schema quality

---------

Co-authored-by: Daniel Tomes <10840635+GeekSheikh@users.noreply.github.com>

* apiURL "/" removed and dbsql added (#699)

* bug fix

* bug fix

* bug fix

* bug fix

* Improve acquisition of Cloud Provider and OrgID (#708)

* Improve acquisition of Cloud Provider and OrgID

* Improve acquisition of Cloud Provider and OrgID

* Modularize getOrgID function

* removed old commented version of code

---------

Co-authored-by: Sourav Banerjee <30810740+Sourav692@users.noreply.github.com>
Co-authored-by: Daniel Tomes <10840635+GeekSheikh@users.noreply.github.com>

* Update SilverTransforms.scala (#703)

* Overwatch on photon broadcast exchange perf issue (#705)

* Change the Join to Shuffle_hash Join for collectEventLogPaths

* Change the Join to Shuffle_hash Join for collectEventLogPaths

* adding pagination logic for job-runs api (#723)

* 729 - enable clusterEvents merge Insert (#730)

* enable clusterEvents merge Insert

* added comments

---------

Co-authored-by: geeksheikh <geeksheikh@users.noreply.github.com>

* auditRaw - mergeInsertOnly (#738)

* enable clusterEvents merge Insert

* added comments

* 737 - dateGlobFix and auditLogRaw mergeInserts

---------

Co-authored-by: geeksheikh <geeksheikh@users.noreply.github.com>

* implemented (#752)

* asStrings implemented for apicallv2 (#707)

* 749 fill meta improved (#753)

* 749 fill meta improved

* put tsPartVal in clsf back to 16

---------

Co-authored-by: geeksheikh <geeksheikh@users.noreply.github.com>

* cleaning jobs_snap_bronze new_cluster field (#732)

Co-authored-by: geeksheikh <geeksheikh@users.noreply.github.com>

* dbu and cost calculations fixed (#760)

* dbu calcs corrected

* readd aliases

* add runtime_engine to fillforward

* added a few comments to funcs

* corrected workerDBU Cost Value

* enabled remote getWorkspaceByDatabase (#754)

Co-authored-by: geeksheikh <geeksheikh@users.noreply.github.com>

* improved first run Impute clusterSpec (#759)

* excluded scope enhanced (#740)

* excluded scope enhanced

* review comment implemented

* modified lowerCase logic

---------

Co-authored-by: Daniel Tomes <10840635+GeekSheikh@users.noreply.github.com>

* adding temp location, start and end time in jobs runs list api (#755)

* adding temp location, start and end time in jobs runs list api

* change in jobRunsList api call

* removed default apiVersion from new apply method

* adding fix for jobs runs list api

* adding code to cleanse duplicate cols in JobRunsList transform, and added new bronze snapshots in target

* reading mount source from csv implemented (#695)

* reading mount source from csv implemented

* driver workspace should not call search/mount to get source

* review comment implemented

* review comment implemented

* Reading config from delta implemented. (#713)

* reading config from delta implemented.
skip mount point check for AWS added.

* review comment implemented

* review comment implemented

* review comment implemented

* review comment implemented

* review comment implemented

* merge conflict removed

* shuffle partition changed to String (#717)

* shuffle partition changed to String

* comments implemented

* comments implemented

* comments implemented

* comments implemented

* comments implemented

* comments implemented

* comments implemented

* test cases added

* test cases added

* adding generic api calls function (#756)

* adding generic api calls function

* adding an empty map as return in APIMeta Trait for def getAPIJsonQuery

* adding function getParallelAPIParams

* implemented code review comments

* removed commented lines

* one workspace instance per workspace deployment (#774)

* adding cluster type in jrcp view (#778)

* improved spark conf handler and optimized confs (#773)

* mount mapping validation added (#777)

* mount mapping validation added

* review comments implemented

* review comments implemented

* review comments implemented

* review comments implemented

* Integration Testing - Bug Fixes (#782)

* added persistAndLoad to all writes with tableLocking

* dont perform data validation if path validation fails -- protrects first run failures especially

* fix excludedScopes

* null config handlers plus proxy scope,key error handler

* added persistAndLoad to all writes with tableLocking

* dont perform data validation if path validation fails -- protrects first run failures especially

* fix excludedScopes

* null config handlers plus proxy scope,key error handler

* cleanup

* debugging

* fixed futures executions

* additional fixes

* dbu cost fix

* getOrgID bug fix

* target exists enhancement for delta target path validation

* getWorkspaceByDatabase -- cross-cloud remote workspace enabled

* added experimental flag to jrsnapshot and enabled manual module disabling

* rollback and module mapping

---------

Co-authored-by: geeksheikh <geeksheikh@users.noreply.github.com>

---------

Co-authored-by: geeksheikh <geeksheikh@users.noreply.github.com>
Co-authored-by: Sourav Banerjee <109206082+souravbaner-da@users.noreply.github.com>
Co-authored-by: Sourav Banerjee <30810740+Sourav692@users.noreply.github.com>
Co-authored-by: Sriram Mohanty <69749553+sriram251-code@users.noreply.github.com>
Co-authored-by: Aman <91308367+aman-db@users.noreply.github.com>
  • Loading branch information
6 people committed Mar 28, 2023
1 parent 9aab1a2 commit d9fb1dc
Show file tree
Hide file tree
Showing 31 changed files with 1,671 additions and 672 deletions.
155 changes: 150 additions & 5 deletions src/main/scala/com/databricks/labs/overwatch/ApiCallV2.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.databricks.labs.overwatch

import com.databricks.labs.overwatch.ApiCallV2.sc
import com.databricks.labs.overwatch.pipeline.PipelineFunctions
import com.databricks.labs.overwatch.utils._
import com.fasterxml.jackson.databind.ObjectMapper
Expand All @@ -12,7 +13,12 @@ import org.json.JSONObject
import scalaj.http.{HttpOptions, HttpResponse}

import java.util
import java.util.Collections
import java.util.concurrent.Executors
import scala.annotation.tailrec
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
import scala.math.Ordered.orderingToOrdered
import scala.util.{Failure, Success}

/**
* Companion object for APICallV2.
Expand Down Expand Up @@ -56,15 +62,16 @@ object ApiCallV2 extends SparkSessionWrapper {
* @param accumulator To make track of number of api request.
* @return
*/
def apply(apiEnv: ApiEnv, apiName: String, queryMap: Map[String, String], tempSuccessPath: String, accumulator: LongAccumulator): ApiCallV2 = {
def apply(apiEnv: ApiEnv, apiName: String, queryMap: Map[String, String], tempSuccessPath: String
): ApiCallV2 = {
new ApiCallV2(apiEnv)
.setEndPoint(apiName)
.buildMeta(apiName)
.setQueryMap(queryMap)
.setSuccessTempPath(tempSuccessPath)
.setAccumulator(accumulator)
}


/**
* Companion Object which takes three parameter and initialise the ApiCallV2.
*
Expand Down Expand Up @@ -97,6 +104,25 @@ object ApiCallV2 extends SparkSessionWrapper {
.setApiV(apiVersion)
}

/**
*
* @param apiEnv
* @param apiName
* @param queryMap
* @param tempSuccessPath
* @param apiVersion
* @return
*/
def apply(apiEnv: ApiEnv, apiName: String, queryMap: Map[String, String],
tempSuccessPath: String, apiVersion: Double): ApiCallV2 = {
new ApiCallV2(apiEnv)
.setEndPoint(apiName)
.buildMeta(apiName)
.setQueryMap(queryMap)
.setSuccessTempPath(tempSuccessPath)
.setApiV(apiVersion)
}

}

/**
Expand Down Expand Up @@ -127,7 +153,6 @@ class ApiCallV2(apiEnv: ApiEnv) extends SparkSessionWrapper {
private var _queryMap: Map[String, String] = Map[String, String]()
private var _accumulator: LongAccumulator = sc.longAccumulator("ApiAccumulator") //Multithreaded call accumulator will make track of the request.

protected def accumulator: LongAccumulator = _accumulator
protected def apiSuccessCount: Int = _apiSuccessCount

protected def apiFailureCount: Int = _apiFailureCount
Expand Down Expand Up @@ -232,6 +257,8 @@ class ApiCallV2(apiEnv: ApiEnv) extends SparkSessionWrapper {
this
}

def asStrings: Array[String] = _apiResponseArray.toArray(new Array[String](_apiResponseArray.size))

/**
* Setting up the api name and api metadata for that api.
*
Expand Down Expand Up @@ -549,7 +576,7 @@ class ApiCallV2(apiEnv: ApiEnv) extends SparkSessionWrapper {
* Performs api calls in parallel.
* @return
*/
def executeMultiThread(): util.ArrayList[String] = {
def executeMultiThread(accumulator: LongAccumulator): util.ArrayList[String] = {
@tailrec def executeThreadedHelper(): util.ArrayList[String] = {
val response = getResponse
responseCodeHandler(response)
Expand Down Expand Up @@ -607,7 +634,6 @@ class ApiCallV2(apiEnv: ApiEnv) extends SparkSessionWrapper {
responseCodeHandler(response)
_apiResponseArray.add(response.body)
if (apiMeta.storeInTempLocation && successTempPath.nonEmpty) {
accumulator.add(1)
if (apiEnv.successBatchSize <= _apiResponseArray.size()) { //Checking if its right time to write the batches into persistent storage
val responseFlag = PipelineFunctions.writeMicroBatchToTempLocation(successTempPath.get, _apiResponseArray.toString)
if (responseFlag) { //Clearing the resultArray in-case of successful write
Expand Down Expand Up @@ -646,6 +672,125 @@ class ApiCallV2(apiEnv: ApiEnv) extends SparkSessionWrapper {
}
}

/**
* Function to make parallel API calls. Currently this functions supports only SqlQueryHistory and ClusterEvents
* @param endpoint
* @param jsonInput
* @param config
* @return
*/
def makeParallelApiCalls(endpoint: String, jsonInput: Map[String, String], config: Config): String = {
val tempEndpointLocation = endpoint.replaceAll("/","")
val acc = sc.longAccumulator(tempEndpointLocation)

val tmpSuccessPath = if(jsonInput.contains("tmp_success_path")) jsonInput.get("tmp_success_path").get
else s"${config.tempWorkingDir}/${tempEndpointLocation}/${System.currentTimeMillis()}"

val tmpErrorPath = if(jsonInput.contains("tmp_error_path")) jsonInput.get("tmp_error_path").get
else s"${config.tempWorkingDir}/errors/${tempEndpointLocation}/${System.currentTimeMillis()}"

var apiResponseArray = Collections.synchronizedList(new util.ArrayList[String]())
var apiErrorArray = Collections.synchronizedList(new util.ArrayList[String]())
val apiResponseCounter = Collections.synchronizedList(new util.ArrayList[Int]())
implicit val ec: ExecutionContextExecutor = ExecutionContext.fromExecutor(
Executors.newFixedThreadPool(config.apiEnv.threadPoolSize))
val apiMetaFactoryObj = new ApiMetaFactory().getApiClass(endpoint)
val dataFrame_column = apiMetaFactoryObj.dataframeColumn
val parallelApiCallsParams = apiMetaFactoryObj.getParallelAPIParams(jsonInput)
var startValue = parallelApiCallsParams.get("start_value").get.toLong
val endValue = parallelApiCallsParams.get("end_value").get.toLong
val incrementCounter = parallelApiCallsParams.get("increment_counter").get.toLong
val finalResponseCount = parallelApiCallsParams.get("final_response_count").get.toLong

while (startValue < endValue){
val jsonQuery = apiMetaFactoryObj.getAPIJsonQuery(startValue, endValue, jsonInput)

//call future
val future = Future {
val apiObj = ApiCallV2(
config.apiEnv,
endpoint,
jsonQuery,
tempSuccessPath = tmpSuccessPath
).executeMultiThread(acc)

synchronized {
apiObj.forEach(
obj=>if(obj.contains(dataFrame_column)){
apiResponseArray.add(obj)
}
)
if (apiResponseArray.size() >= config.apiEnv.successBatchSize) {
PipelineFunctions.writeMicroBatchToTempLocation(tmpSuccessPath, apiResponseArray.toString)
apiResponseArray = Collections.synchronizedList(new util.ArrayList[String]())
}
}
}
future.onComplete {
case Success(_) =>
apiResponseCounter.add(1)

case Failure(e) =>
if (e.isInstanceOf[ApiCallFailureV2]) {
synchronized {
apiErrorArray.add(e.getMessage)
if (apiErrorArray.size() >= config.apiEnv.errorBatchSize) {
PipelineFunctions.writeMicroBatchToTempLocation(tmpErrorPath, apiErrorArray.toString)
apiErrorArray = Collections.synchronizedList(new util.ArrayList[String]())
}
}
logger.log(Level.ERROR, "Future failure message: " + e.getMessage, e)
}
apiResponseCounter.add(1)
}
startValue = startValue + incrementCounter
}

val timeoutThreshold = config.apiEnv.apiWaitingTime // 5 minutes
var currentSleepTime = 0
var accumulatorCountWhileSleeping = acc.value
while (apiResponseCounter.size() < finalResponseCount && currentSleepTime < timeoutThreshold) {
//As we are using Futures and running 4 threads in parallel, We are checking if all the treads has completed
// the execution or not. If we have not received the response from all the threads then we are waiting for 5
// seconds and again revalidating the count.
if (currentSleepTime > 120000) //printing the waiting message only if the waiting time is more than 2 minutes.
{
println(
s"""Waiting for other queued API Calls to complete; cumulative wait time ${currentSleepTime / 1000}
|seconds; Api response yet to receive ${finalResponseCount - apiResponseCounter.size()}""".stripMargin)
}
Thread.sleep(5000)
currentSleepTime += 5000
if (accumulatorCountWhileSleeping < acc.value) { //new API response received while waiting.
currentSleepTime = 0 //resetting the sleep time.
accumulatorCountWhileSleeping = acc.value
}
}
if (apiResponseCounter.size() != finalResponseCount) { // Checking whether all the api responses has been received or not.
logger.log(Level.ERROR,
s"""Unable to receive all the ${endpoint} api responses; Api response
|received ${apiResponseCounter.size()};Api response not
|received ${finalResponseCount - apiResponseCounter.size()}""".stripMargin)
throw new Exception(
s"""Unable to receive all the ${endpoint} api responses; Api response received
|${apiResponseCounter.size()};
|Api response not received ${finalResponseCount - apiResponseCounter.size()}""".stripMargin)
}
if (apiResponseArray.size() > 0) { //In case of response array didn't hit the batch-size as a
// final step we will write it to the persistent storage.
PipelineFunctions.writeMicroBatchToTempLocation(tmpSuccessPath, apiResponseArray.toString)
apiResponseArray = Collections.synchronizedList(new util.ArrayList[String]())


}
if (apiErrorArray.size() > 0) { //In case of error array didn't hit the batch-size
// as a final step we will write it to the persistent storage.
PipelineFunctions.writeMicroBatchToTempLocation(tmpErrorPath, apiErrorArray.toString)
apiErrorArray = Collections.synchronizedList(new util.ArrayList[String]())
}
tmpSuccessPath
}

}


121 changes: 119 additions & 2 deletions src/main/scala/com/databricks/labs/overwatch/ApiMeta.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.databricks.labs.overwatch

import com.databricks.dbutils_v1.DBUtilsHolder.dbutils
import com.databricks.labs.overwatch.utils.ApiEnv
import com.databricks.labs.overwatch.utils.{ApiEnv, TimeTypes}
import com.fasterxml.jackson.databind.JsonNode
import org.apache.log4j.{Level, Logger}
import scalaj.http.{Http, HttpRequest}
Expand Down Expand Up @@ -107,7 +107,17 @@ trait ApiMeta {
logger.log(Level.INFO, s"""Proxy has been set to IP: ${apiEnv.proxyHost.get} PORT:${apiEnv.proxyPort.get}""")
}
if (apiEnv.proxyUserName.nonEmpty && apiEnv.proxyPasswordScope.nonEmpty && apiEnv.proxyPasswordKey.nonEmpty) {
val password = dbutils.secrets.get(scope = apiEnv.proxyPasswordScope.get, apiEnv.proxyPasswordKey.get)
val password = try {
dbutils.secrets.get(scope = apiEnv.proxyPasswordScope.get, apiEnv.proxyPasswordKey.get)
} catch {
case e: IllegalArgumentException if e.getMessage.contains("Secret does not exist") =>
val failMsg =
s"""Error getting proxy secret details using:
|ProxyPasswordScope: ${apiEnv.proxyPasswordScope}
|ProxyPasswordKey: ${apiEnv.proxyPasswordKey}
|""".stripMargin
throw new Exception(failMsg, e)
}
request = request.proxyAuth(apiEnv.proxyUserName.get, password)
logger.log(Level.INFO, s"""Proxy UserName set to IP: ${apiEnv.proxyUserName.get} scope:${apiEnv.proxyPasswordScope.get} key:${apiEnv.proxyPasswordKey.get}""")
}
Expand Down Expand Up @@ -135,6 +145,16 @@ trait ApiMeta {
|""".stripMargin
}

private[overwatch] def getAPIJsonQuery(startValue: Long, endValue: Long, jsonInput: Map[String, String]): Map[String, String] = {
logger.log(Level.INFO, s"""Needs to be override for specific API for manipulating the input JSON Query""")
Map[String, String]()
}

private[overwatch] def getParallelAPIParams(jsonInput: Map[String, String]): Map[String, String] = {
logger.log(Level.INFO, s"""Needs to be override for specific API for intializing Parallel API call function""")
Map[String, String]()
}

}

/**
Expand All @@ -157,6 +177,11 @@ class ApiMetaFactory {
case "clusters/resize" => new ClusterResizeApi
case "jobs/runs/get" => new JobRunGetApi
case "dbfs/search-mounts" => new DbfsSearchMountsApi
case "jobs/runs/list" => new JobRunsApi
case "libraries/all-cluster-statuses" => new ClusterLibraryApi
case "policies/clusters/list" => new ClusterPolicesApi
case "token/list" => new TokensApi
case "global-init-scripts" => new GlobalInitsScriptsApi
case _ => new UnregisteredApi
}
logger.log(Level.INFO, meta.toString)
Expand Down Expand Up @@ -200,6 +225,32 @@ class SqlQueryHistoryApi extends ApiMeta {
// logger.info(s"DEBUG - NEXT_PAGE_TOKEN = ${_jsonValue}")
requestMap.filterNot { case (k, _) => k.toLowerCase.startsWith("filter_by")} ++ Map(s"page_token" -> s"${_jsonValue}")
}

private[overwatch] override def getAPIJsonQuery(startValue: Long, endValue: Long, jsonInput: Map[String, String]): Map[String, String] = {
val (startTime, endTime) = if ((endValue - startValue)/(1000*60*60) > 1) {
(startValue,
startValue+(1000*60*60))
}
else{
(startValue,
endValue)
}
Map(
"max_results" -> "50",
"include_metrics" -> "true",
"filter_by.query_start_time_range.start_time_ms" -> s"$startTime",
"filter_by.query_start_time_range.end_time_ms" -> s"$endTime"
)
}

private[overwatch] override def getParallelAPIParams(jsonInput: Map[String, String]): Map[String, String] = {
Map(
"start_value" -> s"""${jsonInput.get("start_value").get.toLong}""",
"end_value" -> s"""${jsonInput.get("end_value").get.toLong}""",
"increment_counter" -> s"""${jsonInput.get("increment_counter").get.toLong}""",
"final_response_count" -> s"""${jsonInput.get("final_response_count").get.toLong}"""
)
}
}

class WorkspaceListApi extends ApiMeta {
Expand Down Expand Up @@ -259,4 +310,70 @@ class ClusterEventsApi extends ApiMeta {
setDataframeColumn("events")
setApiCallType("POST")
setStoreInTempLocation(true)

private[overwatch] override def getAPIJsonQuery(startValue: Long, endValue: Long,jsonInput: Map[String, String]): Map[String, String] = {
val clusterIDs = jsonInput.get("cluster_ids").get.split(",").map(_.trim).toArray
val startTime = jsonInput.get("start_time").get.toLong
val endTime = jsonInput.get("end_time").get.toLong

Map("cluster_id" -> s"""${clusterIDs(startValue.toInt)}""",
"start_time" -> s"""${startTime}""",
"end_time" -> s"""${endTime}""",
"limit" -> "500"
)
}

private[overwatch] override def getParallelAPIParams(jsonInput: Map[String, String]): Map[String, String] = {
Map(
"start_value" -> s"""${jsonInput.get("start_value").get.toLong}""",
"end_value" -> s"""${jsonInput.get("end_value").get.toLong}""",
"increment_counter" -> s"""${jsonInput.get("increment_counter").get.toLong}""",
"final_response_count" -> s"""${jsonInput.get("final_response_count").get.toLong}"""
)
}

}

class JobRunsApi extends ApiMeta {
setDataframeColumn("runs")
setApiCallType("GET")
setPaginationKey("has_more")
setIsDerivePaginationLogic(true)
setStoreInTempLocation(true)

private[overwatch] override def hasNextPage(jsonObject: JsonNode): Boolean = {
jsonObject.get(paginationKey).asBoolean()
}

private[overwatch] override def getPaginationLogic(jsonObject: JsonNode, requestMap: Map[String, String]): Map[String, String] = {
val limit = Integer.parseInt(requestMap.get("limit").get)
var offset = Integer.parseInt(requestMap.get("offset").get)
val expand_tasks = requestMap.get("expand_tasks").get
offset = offset + limit
Map(
"limit" -> s"${limit}",
"expand_tasks" -> s"${expand_tasks}",
"offset" -> s"${offset}"
)
}
}

class ClusterLibraryApi extends ApiMeta {
setDataframeColumn("statuses")
setApiCallType("GET")
}

class ClusterPolicesApi extends ApiMeta {
setDataframeColumn("policies")
setApiCallType("GET")
}

class TokensApi extends ApiMeta {
setDataframeColumn("token_infos")
setApiCallType("GET")
}

class GlobalInitsScriptsApi extends ApiMeta {
setDataframeColumn("scripts")
setApiCallType("GET")
}
Loading

0 comments on commit d9fb1dc

Please sign in to comment.