Skip to content

Commit

Permalink
0800 release (#1144)
Browse files Browse the repository at this point in the history
* Initial commit

* traceability implemented (#1102)

* traceability implemented

* code review implemented

* missed code implemented (#1105)

* Initial commit

* traceability implemented (#1102)

* traceability implemented

* code review implemented

* missed code implemented

* missed code implemented

---------

Co-authored-by: Guenia Izquierdo <guenia.izquierdo@databricks.com>

* Added proper exception for Spark Stream Gold if progress c… (#1085)

* Initial commit

* 09-Nov-23: Added proper exception for Spark Stream Gold if progress column contains only null in SparkEvents_Bronze

---------

Co-authored-by: Guenia Izquierdo <guenia.izquierdo@databricks.com>
Co-authored-by: Sourav Banerjee <30810740+Sourav692@users.noreply.github.com>

* Gracefully Handle Exception for NotebookCommands_Gold (#1095)

* Initial commit

* Gracefully Handle Exception for NotebookCommands_Gold

* Convert the check in buildNotebookCommandsFact to single or clause

---------

Co-authored-by: Guenia Izquierdo <guenia.izquierdo@databricks.com>
Co-authored-by: Sourav Banerjee <30810740+Sourav692@users.noreply.github.com>

* code missed in merge (#1120)

* Fix Helper Method to Instantiate Remote Workspaces (#1110)

* Initial commit

* Change getRemoteWorkspaceByPath and getWorkspaceByDatabase to take it RemoteWorkspace

* Remove Unnecessary println Statements

---------

Co-authored-by: Guenia Izquierdo <guenia.izquierdo@databricks.com>

* Ensure we test the write into a partitioned storage_prefix (#1088)

* Initial commit

* Ensure we test the write into a partitioned storage_prefix

* silver warehouse spec fix (#1121)

* added missed copy-pasta (#1129)

* Exclude cluster logs in S3 root bucket (#1118)

* Exclude cluster logs in S3 root bucket

* Omit cluster log paths pointing to s3a as well

* implemented recon (#1116)

* implemented recon

* docs added

* file path change

* review comments implemented

* Added ShuffleFactor to NotebookCommands (#1124)

Co-authored-by: Sourav Banerjee <30810740+Sourav692@users.noreply.github.com>

* disabled traceability (#1130)

* Added JobRun_Silver in buildClusterStateFact for Cluster E… (#1083)

* Initial commit

* 08-Nov-23: Added JobRun_Silver in buildClusterStateFact for Cluster End Time Imputation

* Impute Terminating Events in CLSF from JR_Silver

* Impute Terminating Events in CLSD

* Impute Terminating Events in CLSD

* Change CLSF to original 0730 version

* Change CLSF to original 0730 version

* Added cluster_spec in CLSD to get job Cluster only

* Make the variables name in buildClusterStateDetail into more descriptive way

* Make the variables name in buildClusterStateDetail into more descriptive way

---------

Co-authored-by: Guenia Izquierdo <guenia.izquierdo@databricks.com>
Co-authored-by: Sourav Banerjee <30810740+Sourav692@users.noreply.github.com>

* Sys table audit log integration (#1122)

* system table integration with audit log

* adding code to resolve issues with response col

* fixed timestamp issue

* adding print statement for from and until time

* adding fix for azure

* removed comments

* removed comments and print statements

* removed comments

* implemented code review comments

* implemented code review comments

* adding review comment

* Sys table integration multi acount (#1131)

* added code changes for multi account deployment

* code for multi account system table integration

* Sys table integration multi acount (#1132)

* added code changes for multi account deployment

* code for multi account system table integration

* adding code for system table migration check

* changing exception for empty audit log from system table

* adding code to handle sql_endpoint in configs and fix in migration validation (#1133)

* corner case commit (#1134)

* Handle CLSD Cluster Impute when jrcp and clusterSpec is Empty (#1135)

* Handle CLSD Cluster Impute when jrcp and clusterSpec is Empty

* Exclude last_state from clsd as it is not needed in the logic.

---------

Co-authored-by: Sourav Banerjee <30810740+Sourav692@users.noreply.github.com>

* Exclude 2011 and 2014 as dependency module for 2019 (#1136)

* Exclude 2011 and 2014 as dependency module for 2019

* Added comment in CLSD for understandability

---------

Co-authored-by: Sourav Banerjee <30810740+Sourav692@users.noreply.github.com>

* corner case commit (#1137)

* Update version

* adding fix for empty EH config for system tables (#1140)

* corner case commit (#1142)

* adding fix for empty audit log for warehouse_spec_silver (#1141)

* recon columns removed (#1143)

* recon columns removed

* recon columns removed

---------

Co-authored-by: Sriram Mohanty <69749553+sriram251-code@users.noreply.github.com>
Co-authored-by: Sourav Banerjee <109206082+souravbaner-da@users.noreply.github.com>
Co-authored-by: Sourav Banerjee <30810740+Sourav692@users.noreply.github.com>
Co-authored-by: Aman <91308367+aman-db@users.noreply.github.com>
  • Loading branch information
5 people committed Jan 25, 2024
1 parent 7392028 commit e82f5ff
Show file tree
Hide file tree
Showing 27 changed files with 1,420 additions and 341 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name := "overwatch"

organization := "com.databricks.labs"

version := "0.7.2.2.1"
version := "0.8.0.0"

scalaVersion := "2.12.12"
scalacOptions ++= Seq("-Xmax-classfile-name", "78")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper {

private var _pipelineSnapTime: Long = _

private var _systemTableAudit: String = "system.access.audit"

private def systemTableAudit: String = _systemTableAudit

private def setConfigLocation(value: String): this.type = {
_configLocation = value
Expand Down Expand Up @@ -113,30 +116,7 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper {
val tokenSecret = TokenSecret(config.secret_scope, config.secret_key_dbpat)
val badRecordsPath = s"${config.storage_prefix}/${config.workspace_id}/sparkEventsBadrecords"
// TODO -- ISSUE 781 - quick fix to support non-json audit logs but needs to be added back to external parameters
val auditLogFormat = spark.conf.getOption("overwatch.auditlogformat").getOrElse("json")
val auditLogConfig = if (s"${config.cloud.toLowerCase()}" != "azure") {
AuditLogConfig(rawAuditPath = config.auditlogprefix_source_path, auditLogFormat = auditLogFormat)
} else {

val ehStatePath = s"${config.storage_prefix}/${config.workspace_id}/ehState"
val isAAD = config.aad_client_id.nonEmpty &&
config.aad_tenant_id.nonEmpty &&
config.aad_client_secret_key.nonEmpty &&
config.eh_conn_string.nonEmpty
val azureLogConfig = if(isAAD){
AzureAuditLogEventhubConfig(connectionString = config.eh_conn_string.get, eventHubName = config.eh_name.get
, auditRawEventsPrefix = ehStatePath,
azureClientId = Some(config.aad_client_id.get),
azureClientSecret = Some(dbutils.secrets.get(config.secret_scope, key = config.aad_client_secret_key.get)),
azureTenantId = Some(config.aad_tenant_id.get),
azureAuthEndpoint = config.aad_authority_endpoint.getOrElse("https://login.microsoftonline.com/")
)
}else{
val ehConnString = s"{{secrets/${config.secret_scope}/${config.eh_scope_key.get}}}"
AzureAuditLogEventhubConfig(connectionString = ehConnString, eventHubName = config.eh_name.get, auditRawEventsPrefix = ehStatePath)
}
AuditLogConfig(azureAuditLogEventhubConfig = Some(azureLogConfig))
}
val auditLogConfig = getAuditlogConfigs(config)
val interactiveDBUPrice: Double = config.interactive_dbu_price
val automatedDBUPrice: Double = config.automated_dbu_price
val sqlComputerDBUPrice: Double = config.sql_compute_dbu_price
Expand All @@ -152,6 +132,7 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper {
val stringDate = dateFormat.format(primordialDateString)
val apiEnvConfig = getProxyConfig(config)
val temp_dir_path = config.temp_dir_path.getOrElse("")
val sql_endpoint = config.sql_endpoint

val params = OverwatchParams(
auditLogConfig = auditLogConfig,
Expand All @@ -165,7 +146,8 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper {
workspace_name = Some(customWorkspaceName),
externalizeOptimize = true,
apiEnvConfig = Some(apiEnvConfig),
tempWorkingDir = temp_dir_path
tempWorkingDir = temp_dir_path,
sqlEndpoint = sql_endpoint
)
MultiWorkspaceParams(JsonUtils.objToJson(params).compactString,
s"""${config.api_url}""",
Expand All @@ -186,7 +168,61 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper {
}
}

private def getProxyConfig(config: MultiWorkspaceConfig): ApiEnvConfig = {
private def getAuditLogConfigForSystemTable(config: MultiWorkspaceConfig): AuditLogConfig = {
if(config.sql_endpoint.getOrElse("").isEmpty) {
val auditLogFormat = "delta"
AuditLogConfig(rawAuditPath = config.auditlogprefix_source_path,
auditLogFormat = auditLogFormat, systemTableName = Some(systemTableAudit))
}
else {
val auditLogFormat = "delta"
AuditLogConfig(rawAuditPath = config.auditlogprefix_source_path,
auditLogFormat = auditLogFormat, systemTableName = Some(systemTableAudit),
sqlEndpoint = config.sql_endpoint)
}
}

private def getAuditLogConfigForAwsGcp(config: MultiWorkspaceConfig): AuditLogConfig = {
val auditLogFormat = spark.conf.getOption("overwatch.auditlogformat").getOrElse("json")
AuditLogConfig(rawAuditPath = config.auditlogprefix_source_path, auditLogFormat = auditLogFormat)
}

private def getAuditLogConfigForzure(config: MultiWorkspaceConfig): AuditLogConfig = {
val ehStatePath = s"${config.storage_prefix}/${config.workspace_id}/ehState"
val isAAD = config.aad_client_id.nonEmpty &&
config.aad_tenant_id.nonEmpty &&
config.aad_client_secret_key.nonEmpty &&
config.eh_conn_string.nonEmpty
val azureLogConfig = if (isAAD) {
AzureAuditLogEventhubConfig(connectionString = config.eh_conn_string.get, eventHubName = config.eh_name.get
, auditRawEventsPrefix = ehStatePath,
azureClientId = Some(config.aad_client_id.get),
azureClientSecret = Some(dbutils.secrets.get(config.secret_scope, key = config.aad_client_secret_key.get)),
azureTenantId = Some(config.aad_tenant_id.get),
azureAuthEndpoint = config.aad_authority_endpoint.getOrElse("https://login.microsoftonline.com/")
)
} else {
val ehConnString = s"{{secrets/${config.secret_scope}/${config.eh_scope_key.get}}}"
AzureAuditLogEventhubConfig(connectionString = ehConnString, eventHubName = config.eh_name.get, auditRawEventsPrefix = ehStatePath)
}
AuditLogConfig(azureAuditLogEventhubConfig = Some(azureLogConfig))
}

private def getAuditlogConfigs(config: MultiWorkspaceConfig): AuditLogConfig = {
if (config.auditlogprefix_source_path.getOrElse("").toLowerCase.equals("system")) {
getAuditLogConfigForSystemTable(config)
} else {
if(s"${config.cloud.toLowerCase()}" != "azure") {
getAuditLogConfigForAwsGcp(config)
}
else {
getAuditLogConfigForzure(config)
}
}
}


private def getProxyConfig(config: MultiWorkspaceConfig): ApiEnvConfig = {
val apiProxyConfig = ApiProxyConfig(config.proxy_host, config.proxy_port, config.proxy_user_name, config.proxy_password_scope, config.proxy_password_key)
val apiEnvConfig = ApiEnvConfig(config.success_batch_size.getOrElse(200),
config.error_batch_size.getOrElse(500),
Expand Down Expand Up @@ -416,7 +452,7 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper {
.as[MultiWorkspaceConfig]
.filter(_.active)
.collect()
if(multiWorkspaceConfig.length < 1){
if (multiWorkspaceConfig.length < 1) {
throw new BadConfigException("Config file has 0 record, config file:" + configLocation)
}
multiWorkspaceConfig
Expand All @@ -426,7 +462,6 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper {
logger.log(Level.ERROR, fullMsg)
throw e
}

}


Expand Down Expand Up @@ -479,6 +514,7 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper {

println("ParallelismLevel :" + parallelism)
val multiWorkspaceConfig = generateMultiWorkspaceConfig(configLocation, deploymentId, outputPath)

snapshotConfig(multiWorkspaceConfig)
val params = DeploymentValidation
.performMandatoryValidation(multiWorkspaceConfig, parallelism)
Expand All @@ -492,7 +528,7 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper {
deploymentReport.appendAll(deploymentReports)
saveDeploymentReport(deploymentReport.toArray, multiWorkspaceConfig.head.storage_prefix, "deploymentReport")
} catch {
case e: Exception =>
case e: Throwable =>
val failMsg = s"FAILED DEPLOYMENT WITH EXCEPTION"
println(failMsg)
logger.log(Level.ERROR, failMsg, e)
Expand Down Expand Up @@ -544,4 +580,6 @@ class MultiWorkspaceDeployment extends SparkSessionWrapper {
returnParam
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ class ParamDeserializer() extends StdDeserializer[OverwatchParams](classOf[Overw
val rawAuditPath = getOptionString(masterNode, "auditLogConfig.rawAuditPath")
val auditLogFormat = getOptionString(masterNode, "auditLogConfig.auditLogFormat").getOrElse("json")
val azureEventHubNode = getNodeFromPath(masterNode, "auditLogConfig.azureAuditLogEventhubConfig")
val systemTableName = getOptionString(masterNode, "auditLogConfig.systemTableName")
val sqlEndpoint= getOptionString(masterNode, "auditLogConfig.sqlEndpoint")

val azureAuditEventHubConfig = if (azureEventHubNode.nonEmpty) {
val node = azureEventHubNode.get
Expand All @@ -121,7 +123,8 @@ class ParamDeserializer() extends StdDeserializer[OverwatchParams](classOf[Overw
None
}

val auditLogConfig = AuditLogConfig(rawAuditPath, auditLogFormat, azureAuditEventHubConfig)
val auditLogConfig = AuditLogConfig(rawAuditPath, auditLogFormat, azureAuditEventHubConfig, systemTableName,
sqlEndpoint)

val dataTarget = if (masterNode.has("dataTarget")) {
Some(DataTarget(
Expand Down Expand Up @@ -194,7 +197,7 @@ class ParamDeserializer() extends StdDeserializer[OverwatchParams](classOf[Overw
} else {
None
}

val sql_endpoint = getOptionString(masterNode, "sqlEndpoint")
OverwatchParams(
auditLogConfig,
token,
Expand All @@ -208,7 +211,8 @@ class ParamDeserializer() extends StdDeserializer[OverwatchParams](classOf[Overw
workspace_name,
externalizeOptimize,
apiEnvConfig,
tempWorkingDir
tempWorkingDir,
sql_endpoint
)
}
}
Loading

0 comments on commit e82f5ff

Please sign in to comment.